Getting Started with Apache Spark SQL
April 2020
Getting Started with Apache Spark SQL
Please configure your cluster to use Databricks Runtime version 6.2 which includes:
- Python Version 3.x
- Scala Version 2.11
- Apache Spark 2.4.4
02: Querying Files with SQL
%sql
SELECT * FROM People10M
%sql
DESCRIBE People10M
which women were born after 1990?
%sql
SELECT firstName, middleName, lastName, birthDate
FROM People10M
WHERE year(birthDate) > 1990 AND gender = 'F'
Spark provides a number of built-in functions, many of which can be used directly from SQL. These functions can be used in the WHERE
expressions to filter data and in SELECT
expressions to create derived columns.
The following SQL statement finds women born after 1990; it uses the year
function, and it creates a birthYear
column on the fly.
%sql
SELECT firstName, middleName, lastName, year(birthDate) as birthYear, salary
FROM People10M
WHERE year(birthDate) > 1990 AND gender = 'F'
How many women were named Mary in each year?
%sql
SELECT year(birthDate) as birthYear, count(*) AS total
FROM People10M
WHERE firstName = 'Mary' AND gender = 'F'
GROUP BY birthYear
ORDER BY birthYear
Compare popularity of two names from 1990
%sql
SELECT year(birthDate) as birthYear, firstName, count(*) AS total
FROM People10M
WHERE (firstName = 'Dorothy' or firstName = 'Donna') AND gender = 'F' AND year(birthDate) > 1990
GROUP BY birthYear, firstName
ORDER BY birthYear, firstName
Temporary Views
Temporary views assign a name to a query that will be reused as if they were tables themselves.
%sql
CREATE OR REPLACE TEMPORARY VIEW TheDonnas AS
SELECT *
FROM People10M
WHERE firstName = 'Donna'
%sql
SELECT * FROM TheDonnas
Create more complex query from People10M table
%sql
CREATE OR REPLACE TEMPORARY VIEW WomenBornAfter1990 AS
SELECT firstName, middleName, lastName, year(birthDate) AS birthYear, salary
FROM People10M
WHERE year(birthDate) > 1990 AND gender = 'F'
%sql
SELECT birthYear, count(*)
FROM WomenBornAfter1990
WHERE firstName = 'Mary'
GROUP BY birthYear
ORDER BY birthYear
Create a temporary view called Top10FemaleFirstNames
that contains the 10 most common female first names in the People10M
table. The view must have two columns:
firstName
- the first nametotal
- the total number of rows with that first name
%sql
CREATE OR REPLACE TEMPORARY VIEW Top10FemaleFirstNames AS
SELECT firstName, COUNT(firstName) AS total
FROM People10M
WHERE gender = 'F'
GROUP BY firstName
ORDER BY total DESC, firstName
LIMIT 10
%sql
SELECT * FROM Top10FemaleFirstNames
03: Aggregations, JOINs and Nested Queries
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions
Get average salary
%sql
SELECT avg(salary) AS averageSalary
FROM People10M
Convert that value to an integer using the SQL round()
function:
%sql
SELECT round(avg(salary)) AS averageSalary
FROM People10M
add in max and min
%sql
SELECT max(salary) AS max, min(salary) AS min, round(avg(salary)) AS average
FROM People10M
Joining two tables
How many of the first names appear in Social Security data files?
%sql
SELECT * FROM SSANames
How many distinct names there are in each of our tables
%sql
SELECT count(DISTINCT firstName)
FROM People10M
%sql
SELECT count(DISTINCT firstName)
FROM SSANames
Introduce two more temporary views, each one consisting of distinct names, the join will be easier to read/write
%sql
CREATE OR REPLACE TEMPORARY VIEW SSADistinctNames AS
SELECT DISTINCT firstName AS ssaFirstName
FROM SSANames;
CREATE OR REPLACE TEMPORARY VIEW PeopleDistinctNames AS
SELECT DISTINCT firstName
FROM People10M
Join the two tables together to get the answer
%sql
SELECT firstName
FROM PeopleDistinctNames
INNER JOIN SSADistinctNames ON firstName = ssaFirstName
How many are there?
%sql
SELECT count(*)
FROM PeopleDistinctNames
INNER JOIN SSADistinctNames ON firstName = ssaFirstName
Nested Queries
%sql
SELECT count(firstName)
FROM PeopleDistinctNames
WHERE firstName IN (
SELECT ssaFirstName FROM SSADistinctNames
)
Some of the salaries in the People10M
table are negative. Convert all the negative salaries to positive ones, and then sort the top 20 people by their salary
Create a temporary view called PeopleWithFixedSalaries
, where all the negative salaries have been converted to positive numbers.
%sql
DROP TABLE IF EXISTS PeopleWithFixedSalaries;
CREATE OR REPLACE TEMPORARY VIEW PeopleWithFixedSalaries AS
SELECT firstName, middleName, lastName, gender, birthDate, ssn, abs(salary) AS salary
FROM People10M
Create another view called PeopleWithFixedSalariesSorted
where:
- The data set has been reduced to the first 20 records
- The records are sorted by the column
salary
in ascending order
%sql
DROP TABLE IF EXISTS PeopleWithFixedSalariesSorted;
CREATE OR REPLACE TEMPORARY VIEW PeopleWithFixedSalariesSorted AS
SELECT *
FROM PeopleWithFixedSalaries
ORDER BY salary
LIMIT 20
As a refinement, assume that all salaries under $20,000 represent bad rows and filter them out.
Additionally, categorize each person's salary into $10K groups.
Create a temporary view called PeopleWithFixedSalaries20K
where:
- Start with the table
PeopleWithFixedSalaries
- The data set excludes all records where salaries are below $20K
- The data set includes a new column called
salary10k
, that should be the salary in groups of 10,000. For example:
- A salary of 23,000 should report a value of "2"
- A salary of 57,400 should report a value of "6"
- A salary of 1,231,375 should report a value of "123"
%sql
DROP TABLE IF EXISTS PeopleWithFixedSalaries20K;
CREATE OR REPLACE TEMPORARY VIEW PeopleWithFixedSalaries20K AS
SELECT *, round(salary / 10000) AS salary10k
FROM PeopleWithFixedSalaries
WHERE salary >= 20000
Using the People10M
table, count the number of females named Caren who were born before March 1980.
Starting with the table People10M
, create a temporary view called Carens
where:
- The result set has a single record
- The data set has a single column named
total
- The result counts only
- Females (
gender
) - First Name is "Caren" (
firstName
) - Born before March 1980 (
birthDate
)
- Females (
%sql
DROP TABLE IF EXISTS Carens;
CREATE TEMPORARY VIEW Carens AS
SELECT count(*) AS total
FROM People10M
WHERE birthDate < '1980-03-01' AND firstName = 'Caren' AND gender = 'F'
Use the SSANames
table to find the most popular first name for girls in 1885, 1915, 1945, 1975, and 2005.
Create a temporary view called HistoricNames
where:
- The table
HistoricNames
is created using a single SQL query. - The result has three columns:
firstName
year
total
%sql
CREATE OR REPLACE TEMPORARY VIEW HistoricNames AS
SELECT firstName, year, total
FROM SSANames
NATURAL INNER JOIN (
SELECT year, gender, max(total) AS total
FROM SSANames
GROUP BY year, gender
) AS max_names
WHERE gender='F' AND year IN (1885, 1915, 1945, 1975, 2005)
ORDER BY year
04: Accessing Data
The Databricks File System (DBFS) is the built-in, Azure-blob-backed, alternative to the Hadoop Distributed File System (HDFS).
Creating a table from an existing file in DBFS allows you to access the file as if it were a Spark table. It does not copy any data.
Create a table from an existing file
Create a table from the ip-geocode.parquet file
Create a table from an existing DBFS file with a simple SQL CREATE TABLE
statement
%sql
CREATE DATABASE IF NOT EXISTS databricks;
USE databricks;
CREATE TABLE IF NOT EXISTS IPGeocode
USING parquet
OPTIONS (
path "dbfs:/mnt/training/ip-geocode.parquet"
)
It is often better to use a "personal" database
spark.sql(f"USE {databaseName}")
One common format is CSV (comma-separated-values) for which you can specify:
- The file's delimiter, the default is ","
- Whether the file has a header or not, the default is false
- Whether or not to infer the schema, the default is false
Look at the first couple of lines of a file
%fs head /mnt/training/bikeSharing/data-001/day.csv --maxBytes=492
Spark can create a table from that CSV file, as well.
As you can see above:
- There is a header
- The file is comma separated (the default)
- Let Spark infer what the schema is
%sql
CREATE TABLE IF NOT EXISTS BikeSharingDay
USING csv
OPTIONS (
path "/mnt/training/bikeSharing/data-001/day.csv",
inferSchema "true",
header "true"
)
%sql
SELECT * FROM BikeSharingDay
%sql
SELECT * FROM BikeSharingDay
Upload a local file as a table
Download a file
Select Data from the sidebar, and click the junk database
Select the + icon to create a new table
Click the Create Table with UI button
In the drop-down dialog, select a cluster
Click the Preview Table button
Another dialog will drop down. Choose the junk database
Select the First row is header checkbox
Click the Create Table button
Drop the table to ensure other users don't have a name conflict when uploading their tables
%sql
DROP TABLE IF EXISTS state_income