Getting Started with Apache Spark SQL

April 2020

Getting Started with Apache Spark SQL

Please configure your cluster to use Databricks Runtime version 6.2 which includes:

  • Python Version 3.x
  • Scala Version 2.11
  • Apache Spark 2.4.4

02: Querying Files with SQL

%sql
SELECT * FROM People10M


%sql
DESCRIBE People10M

which women were born after 1990?

%sql
SELECT firstName, middleName, lastName, birthDate
FROM People10M
WHERE year(birthDate) > 1990 AND gender = 'F'

Spark provides a number of built-in functions, many of which can be used directly from SQL. These functions can be used in the WHERE expressions to filter data and in SELECT expressions to create derived columns.

The following SQL statement finds women born after 1990; it uses the year function, and it creates a birthYear column on the fly.

%sql
SELECT firstName, middleName, lastName, year(birthDate) as birthYear, salary 
FROM People10M
WHERE year(birthDate) > 1990 AND gender = 'F'

How many women were named Mary in each year?

%sql
SELECT year(birthDate) as birthYear, count(*) AS total
FROM People10M
WHERE firstName = 'Mary' AND gender = 'F'
GROUP BY birthYear
ORDER BY birthYear

Compare popularity of two names from 1990

%sql
SELECT year(birthDate) as birthYear,  firstName, count(*) AS total
FROM People10M
WHERE (firstName = 'Dorothy' or firstName = 'Donna') AND gender = 'F' AND year(birthDate) > 1990
GROUP BY birthYear, firstName
ORDER BY birthYear, firstName

Temporary Views

Temporary views assign a name to a query that will be reused as if they were tables themselves.

%sql
CREATE OR REPLACE TEMPORARY VIEW TheDonnas AS
  SELECT * 
  FROM People10M 
  WHERE firstName = 'Donna'

%sql
SELECT * FROM TheDonnas

Create more complex query from People10M table

%sql
CREATE OR REPLACE TEMPORARY VIEW WomenBornAfter1990 AS
  SELECT firstName, middleName, lastName, year(birthDate) AS birthYear, salary 
  FROM People10M
  WHERE year(birthDate) > 1990 AND gender = 'F'

%sql
SELECT birthYear, count(*) 
FROM WomenBornAfter1990 
WHERE firstName = 'Mary' 
GROUP BY birthYear 
ORDER BY birthYear

Create a temporary view called Top10FemaleFirstNames that contains the 10 most common female first names in the People10M table. The view must have two columns:

  • firstName - the first name
  • total - the total number of rows with that first name
%sql
CREATE OR REPLACE TEMPORARY VIEW Top10FemaleFirstNames AS
  SELECT firstName, COUNT(firstName) AS total
  FROM People10M
  WHERE gender = 'F'
  GROUP BY firstName
  ORDER BY total DESC, firstName
  LIMIT 10

%sql
SELECT * FROM Top10FemaleFirstNames 

03: Aggregations, JOINs and Nested Queries

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

Get average salary

%sql
SELECT avg(salary) AS averageSalary 
FROM People10M

Convert that value to an integer using the SQL round() function:

%sql
SELECT round(avg(salary)) AS averageSalary 
FROM People10M

add in max and min

%sql
SELECT max(salary) AS max, min(salary) AS min, round(avg(salary)) AS average 
FROM People10M

Joining two tables

How many of the first names appear in Social Security data files?

%sql
SELECT * FROM SSANames

How many distinct names there are in each of our tables

%sql
SELECT count(DISTINCT firstName) 
FROM People10M

%sql
SELECT count(DISTINCT firstName)
FROM SSANames

Introduce two more temporary views, each one consisting of distinct names, the join will be easier to read/write

%sql
CREATE OR REPLACE TEMPORARY VIEW SSADistinctNames AS 
  SELECT DISTINCT firstName AS ssaFirstName 
  FROM SSANames;

CREATE OR REPLACE TEMPORARY VIEW PeopleDistinctNames AS 
  SELECT DISTINCT firstName 
  FROM People10M

Join the two tables together to get the answer

%sql
SELECT firstName 
FROM PeopleDistinctNames 
INNER JOIN SSADistinctNames ON firstName = ssaFirstName

How many are there?

%sql
SELECT count(*) 
FROM PeopleDistinctNames 
INNER JOIN SSADistinctNames ON firstName = ssaFirstName

Nested Queries

%sql
SELECT count(firstName) 
FROM PeopleDistinctNames
WHERE firstName IN (
  SELECT ssaFirstName FROM SSADistinctNames
)

Some of the salaries in the People10M table are negative. Convert all the negative salaries to positive ones, and then sort the top 20 people by their salary

Create a temporary view called PeopleWithFixedSalaries, where all the negative salaries have been converted to positive numbers.

%sql
DROP TABLE IF EXISTS PeopleWithFixedSalaries;

CREATE OR REPLACE TEMPORARY VIEW PeopleWithFixedSalaries AS
  SELECT firstName, middleName, lastName, gender, birthDate, ssn, abs(salary) AS salary
  FROM People10M

Create another view called PeopleWithFixedSalariesSorted where:

  1. The data set has been reduced to the first 20 records
  2. The records are sorted by the column salary in ascending order
%sql
DROP TABLE IF EXISTS PeopleWithFixedSalariesSorted;

CREATE OR REPLACE TEMPORARY VIEW PeopleWithFixedSalariesSorted AS
  SELECT * 
  FROM PeopleWithFixedSalaries
  ORDER BY salary
  LIMIT 20

As a refinement, assume that all salaries under $20,000 represent bad rows and filter them out.

Additionally, categorize each person's salary into $10K groups.

Create a temporary view called PeopleWithFixedSalaries20K where:

  1. Start with the table PeopleWithFixedSalaries
  2. The data set excludes all records where salaries are below $20K
  3. The data set includes a new column called salary10k, that should be the salary in groups of 10,000. For example:
  • A salary of 23,000 should report a value of "2"
  • A salary of 57,400 should report a value of "6"
  • A salary of 1,231,375 should report a value of "123"
%sql
DROP TABLE IF EXISTS PeopleWithFixedSalaries20K;

CREATE OR REPLACE TEMPORARY VIEW PeopleWithFixedSalaries20K AS
  SELECT *, round(salary / 10000) AS salary10k 
  FROM PeopleWithFixedSalaries 
  WHERE salary >= 20000

Using the People10M table, count the number of females named Caren who were born before March 1980.

Starting with the table People10M, create a temporary view called Carens where:

  1. The result set has a single record
  2. The data set has a single column named total
  3. The result counts only
    • Females (gender)
    • First Name is "Caren" (firstName)
    • Born before March 1980 (birthDate)
%sql
DROP TABLE IF EXISTS Carens;

CREATE TEMPORARY VIEW Carens AS
  SELECT count(*) AS total 
  FROM People10M
  WHERE birthDate < '1980-03-01' AND firstName = 'Caren' AND gender = 'F'

Use the SSANames table to find the most popular first name for girls in 1885, 1915, 1945, 1975, and 2005.

Create a temporary view called HistoricNames where:

  1. The table HistoricNames is created using a single SQL query.
  2. The result has three columns:
  • firstName
  • year
  • total
%sql
CREATE OR REPLACE TEMPORARY VIEW HistoricNames AS
SELECT firstName, year, total
FROM SSANames
NATURAL INNER JOIN (
  SELECT year, gender, max(total) AS total
  FROM  SSANames
  GROUP BY year, gender
) AS max_names
WHERE gender='F' AND year IN (1885, 1915, 1945, 1975, 2005)
ORDER BY year

04: Accessing Data

The Databricks File System (DBFS) is the built-in, Azure-blob-backed, alternative to the Hadoop Distributed File System (HDFS).

Creating a table from an existing file in DBFS allows you to access the file as if it were a Spark table. It does not copy any data.

Create a table from an existing file

Create a table from the ip-geocode.parquet file

Create a table from an existing DBFS file with a simple SQL CREATE TABLE statement

%sql
CREATE DATABASE IF NOT EXISTS databricks;

USE databricks;

CREATE TABLE IF NOT EXISTS IPGeocode
  USING parquet
  OPTIONS (
    path "dbfs:/mnt/training/ip-geocode.parquet"
  )

It is often better to use a "personal" database

spark.sql(f"USE {databaseName}")

One common format is CSV (comma-separated-values) for which you can specify:

  • The file's delimiter, the default is ","
  • Whether the file has a header or not, the default is false
  • Whether or not to infer the schema, the default is false

Look at the first couple of lines of a file

%fs head /mnt/training/bikeSharing/data-001/day.csv --maxBytes=492

Spark can create a table from that CSV file, as well.

As you can see above:

  • There is a header
  • The file is comma separated (the default)
  • Let Spark infer what the schema is
%sql
CREATE TABLE IF NOT EXISTS BikeSharingDay
  USING csv
  OPTIONS (
    path "/mnt/training/bikeSharing/data-001/day.csv",
    inferSchema "true",
    header "true"
  )

%sql
SELECT * FROM BikeSharingDay

%sql
SELECT * FROM BikeSharingDay

Upload a local file as a table

Download a file

Select Data from the sidebar, and click the junk database

Select the + icon to create a new table

Click the Create Table with UI button

In the drop-down dialog, select a cluster

Click the Preview Table button

Another dialog will drop down. Choose the junk database

Select the First row is header checkbox

Click the Create Table button

Drop the table to ensure other users don't have a name conflict when uploading their tables

%sql
DROP TABLE IF EXISTS state_income