Getting Started with Apache Spark DataFrames

April 2019

Log into databricks

Importing the course:

  • In the left sidebar, click Home.
  • Right-click your home folder, then click Import.
  • Select browse and then choose your course Lessons.dbc file.
  • Click the Import button.
  • Click the Home icon in the left sidebar.
  • Select your home folder.
  • Select the subfolder matching the name of your course (e.g. Dataframes, ETL-Part-1, etc).
  • Open Lesson 01-Getting-Started.

Hit the keyboard sign in the top right to sheet keyboard shortcuts

00: Why Spark

Convert .csv data to a SQL table

%sql

CREATE DATABASE IF NOT EXISTS Databricks;
USE Databricks;

CREATE TABLE IF NOT EXISTS AirlineFlight
USING CSV
OPTIONS (
  header="true",
  delimiter=",",
  inferSchema="true",
  path="dbfs:/mnt/training/asa/flights/small.csv"
);

CACHE TABLE AirlineFlight;

SELECT * FROM AirlineFlight;

counts the number of delays per model of airplane. Plot it by clicking on the graph button. Then click on the Plot Options and set Display Type: Bar Chart, Keys: (Empty), Series Groupings: Model

%sql

CREATE TABLE IF NOT EXISTS AirlinePlane
USING csv
OPTIONS (
  header = "true",
  delimiter = ",",
  inferSchema = "false",
  path = "dbfs:/mnt/training/asa/planes/plane-data.csv"
);

CACHE TABLE AirlinePlane;

SELECT Model, count(*) AS Delays FROM AirlinePlane WHERE Model IS NOT NULL GROUP BY Model ORDER BY Delays DESC LIMIT 10;

Join disparate data set found in data lakes (not unified and not normalized). Join the Airplane data sets to see which manufacturer has the longest delays. Click the plot options button and set Display Type: Pie Chart, Donut: Unchecked. Then resize the plot by dragging the triangle in the bottom-right of the plot.

%sql

SELECT p.manufacturer AS Manufacturer,
       avg(depDelay) AS Delay
FROM AirlinePlane p
JOIN AirlineFlight f ON p.tailnum = f.tailnum
WHERE p.manufacturer IS NOT null
GROUP BY p.manufacturer
ORDER BY Delay DESC
LIMIT 10

Spark can bring together various data files such as JDBC (SQL Server, Oracle, etc.), Parquet, CSV, JSON, HDFS, Kafka.

It can also be used as a ML platform

from pyspark.sql.functions import col, floor, translate, round
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, OneHotEncoder
from pyspark.ml.regression import LinearRegression

inputDF = (spark.read.table("AirlineFlight")
  .withColumn("HourOfDay", floor(col("CRSDepTime") / 100))
  .withColumn("DepDelay", translate(col("DepDelay"), "NA", "0").cast("integer")))

(trainingDF, testDF) = inputDF.randomSplit([0.80, 0.20], seed=999)

pipeline = Pipeline(stages=[
    OneHotEncoder(inputCol="HourOfDay", outputCol="HourVector"), # I guess OHE is used here because only one feature is used to make the prediction.

VectorAssembler(inputCols=["HourVector"], outputCol="Features"), # https://spark.apache.org/docs/latest/ml-features.html#vectorassembler

    LinearRegression(featuresCol="Features", labelCol="DepDelay", predictionCol="DepDelayPredicted", regParam=0.0)
  ])

model = pipeline.fit(trainingDF)
resultDF = model.transform(testDF)

displayDF = resultDF.select("Year", "Month", "DayOfMonth", "CRSDepTime", "UniqueCarrier", "FlightNum", "DepDelay", round("DepDelayPredicted", 2).alias("DepDelayPredicted"))
display(displayDF)

Visualize the results. Then click on plot options and set Display type: Line Chart, Keys: HourOfDay, Values: Actual: Predicted

display(
  resultDF
    .groupBy("HourOfDay")
    .avg("DepDelay", "DepDelayPredicted")
    .toDF("HourOfDay", "Actual", "Predicted")
    .orderBy("HourOfDay")
)

Generate a stream of data for 5 minutes:

%scala

// Clean any temp files from previous runs.
DummyDataGenerator.clean()

// Generate data for 5 minutes.
// To force it to stop rerun with 0.
DummyDataGenerator.start(5)
  • Read in the stream
  • Parse the flight date and time
  • Compute the average delay for each airline based on the most recent 15 seconds of flight data.
  • Plot results in near real time:
from pyspark.sql.functions import col, date_format, unix_timestamp, window
from pyspark.sql.types import StructType

spark.conf.set("spark.sql.shuffle.partitions", "8")

flightSchema = (StructType()
  .add("FlightNumber", "integer")
  .add("DepartureTime", "string")
  .add("Delay", "double")
  .add("Airline", "string")
)
streamingDF = (spark.readStream
  .schema(flightSchema)
  .csv(DummyDataGenerator.streamDirectory)
  .withColumn("DepartureTime", unix_timestamp("DepartureTime", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp"))
  .withWatermark("DepartureTime", "5 minute")
  .groupBy( window("DepartureTime", "15 seconds"), "Airline" )
  .avg("Delay")
  .select(col("window.start").alias("Start"), "Airline", col("avg(delay)").alias("Average Delay"))
  .orderBy("start", "Airline")
  .select(date_format("start", "HH:mm:ss").alias("Time"), "Airline", "Average Delay")
)
display(streamingDF)

Can use GraphFrames to find networks (install from pypi):

  • Shortest path (between airports)
  • Page rank (which airports are the most important hubs)
  • Connected component (connected groups of friends on Facebook

Page rank:

from pyspark.sql.functions import col, concat_ws, round
from graphframes import GraphFrame

flightVerticesDF = (spark.read
  .option("header", True)
  .option("delimiter", "\t")
  .csv("dbfs:/mnt/training/asa/airport-codes/airport-codes.txt")
  .withColumnRenamed("IATA", "id"))

flightEdgesDF = (spark.table("Databricks.AirlineFlight")
  .withColumnRenamed("Origin", "src")
  .withColumnRenamed("Dest", "dst"))

flightGF = GraphFrame(flightVerticesDF, flightEdgesDF)
pageRankDF = flightGF.pageRank(tol=0.05)

resultsDF = (pageRankDF.vertices
  .select(concat_ws(", ", col("city"), col("state")).alias("Location"),
          round(col("pagerank"), 1).alias("Rank"))
  .orderBy(col("pagerank").desc()))

display(resultsDF)

Spark on Databricks performs 10-2000x faster than Hadoop Map-Reduce.

  • Learn about visualizations here
  • Learn about Databricks File System here
  • Learn about the Machine Learning Library here
  • See the community spark packages here
  • Learn about streaming data here
  • See an example using GraphFrame here and here

01: Getting Started

Create a new cluster:

  • Click the Clusters button in the sidebar.
  • Click the Create Cluster button.
  • Cluster Name : RB
  • Select the cluster type. We recommend the latest runtime (4.0 or newer) and Scala 2.11.
  • Create cluster

DBU pricing here

Create a new notebook:

  • Click the Home button in the sidebar.
  • Right-click on your home folder.
  • Select Create.
  • Select Notebook.
  • Name your notebook First Notebook.
  • Set the language to Python.
  • Select the cluster to which to attach this Notebook.

Run:

1 + 1

Run the cell by clicking the run icon and selecting Run Cell.

You can click the Attached button to detach it or reattach it to a cluster. You can also open the Spark UI and view the Driver's log files.

Summary

  • Create notebooks by clicking the down arrow on a folder and selecting the Create Notebook option.
  • Import notebooks by clicking the down arrow on a folder and selecting the Import option.
  • Attach to a spark cluster by selecting the Attached/Detached option directly below the notebook title.
  • Create clusters using the Clusters button on the left sidebar.

Q: How do you create a Notebook?

A: Sign into Databricks, select the Home icon from the sidebar, right-click your home-folder, select Create, and then Notebook. In the Create Notebook dialog, specify the name of your notebook and the default programming language.

Q: How do you create a cluster?

A: Select the Clusters icon on the sidebar, click the Create Cluster button, specify the specific settings for your cluster and then click Create Cluster.

Q: How do you attach a notebook to a cluster?

A: If you run a command while detached, you may be prompted to connect to a cluster. To connect to a specific cluster, open the cluster menu by clicking the Attached/Detached menu item and then selecting your desired cluster.

Q: Are there additional docs I can reference to find my way around Databricks?

A: See Getting Started with Databricks.

Q: Where can I learn more about the cluster configuration options?

A: See Spark Clusters on Databricks.

Q: Can I import formats other than .dbc files?

A: Yes, see Importing Notebooks.

02: Querying Files

RDDs (Resilient Distributed Datasets)

  • Resilient: They are fault tolerant, so if part of your operation fails, Spark quickly recovers the lost computation.
  • Distributed: RDDs are distributed across networked machines known as a cluster.
  • DataFrame: A data structure where data is organized into named columns, like a table in a relational database, but with richer optimizations under the hood.

Spark uses the Catalyst Optimizer to determine the best way to optimize your code.

You do not need to specify the order of chained operations as they are always optimized

df.select(...).orderBy(...).filter(...)  or df.filter(...).select(...).orderBy(...) 

Read file(s):

peopleDF = spark.read.parquet("/mnt/training/dataframes/people-10m.parquet")

Show field name, field type, and whether the column is nullable or not (default is true) :

peopleDF.printSchema()
# or
peopleDF.dtypes

which women were born after 1990? The command below shows the first 1,000 rows

from pyspark.sql.functions import year
display(
  peopleDF 
    .select("firstName", "middleName", "lastName", "birthDate", "gender") 
    .filter("gender = 'F'") 
    .filter(year("birthDate") > "1990")
)
# if you don't have the display() you can use peopleDF.show(). This shows a html table

Spark uses these functions.

Keep just the year of birthDate:

display(
  peopleDF.select("firstName","middleName","lastName",year("birthDate").alias('birthYear'),"salary") 
    .filter(year("birthDate") > "1990") 
    .filter("gender = 'F' ")
)

Visualization

How many women were named Mary in each year?

marysDF = (peopleDF.select(year("birthDate").alias("birthYear")) 
  .filter("firstName = 'Mary' ") 
  .filter("gender = 'F' ") 
  .orderBy("birthYear") 
  .groupBy("birthYear") 
  .count()
)

Use display() then click on Bar Chart column -> Plot Options... -> Remove '<id>' from the keys and drag 'birthYear' instead -> Apply

To Changes colors -> Plot Options. Drag 'firstName' to Series grouping -> Apply

Compare popularity of two names from 1990:

from pyspark.sql.functions import col # Easy select a column to do a conditional statement on it
dordonDF = (peopleDF 
  .select(year("birthDate").alias("birthYear"), "firstName") 
  .filter((col("firstName") == 'Donna') | (col("firstName") == 'Dorothy')) 
  .filter("gender == 'F' ") 
  .filter(year("birthDate") > 1990) 
  .orderBy("birthYear") 
  .groupBy("birthYear", "firstName") 
  .count()
)

Temporary Views

You can create a temporary view of a DataFrame and put into into a table so you can do SQL on it.

Create a table called "People10M"

peopleDF.createOrReplaceTempView("People10M")
display(spark.sql("SELECT * FROM  People10M WHERE firstName = 'Donna' "))

Note: In Databricks, %sql is the equivalent of display() combined with spark.sql():

%sql
SELECT * FROM People10M WHERE firstName = 'Donna'


womenBornAfter1990DF = (peopleDF 
  .select("firstName", "middleName", "lastName",year("birthDate").alias("birthYear"), "salary") 
  .filter(year("birthDate") > 1990) 
  .filter("gender = 'F' ") 
)

womenBornAfter1990DF.createOrReplaceTempView("womenBornAfter1990")

%sql
SELECT count(*) FROM womenBornAfter1990 where firstName = 'Mary'

Create a DataFrame called top10FemaleFirstNamesDF that contains the 10 most common female first names

%sql
SELECT firstName, COUNT(firstName) AS total
FROM People10M
WHERE gender = 'F'
GROUP BY firstName
ORDER BY total DESC
LIMIT 10

from pyspark.sql.functions import count, desc

top10FemaleFirstNamesDF = (peopleDF
  .select("firstName")
  .filter("gender == 'F'")
  .groupBy("firstName")
  .agg(count(col('firstname')).alias('total'))
  .orderBy((desc('total'), "firstName)
  .limit(10)                    
)

Convert the DataFrame into a temporary view and display the contents

top10FemaleFirstNamesDF.createOrReplaceTempView("Top10FemaleFirstNames")
resultsDF = spark.sql("SELECT * FROM Top10FemaleFirstNames ORDER BY firstName")
display(resultsDF)

You can see the modules here

03: Aggregations and Joins

See here for a list of functions.

Average:

peopleDF = spark.read.parquet("/mnt/training/dataframes/people-10m.parquet")

from pyspark.sql.functions import avg
avgSalaryDF = peopleDF.select(avg("salary").alias("averageSalary"))

avgSalaryDF.show()

Convert to round:

from pyspark.sql.functions import round
roundedAvgSalaryDF = avgSalaryDF.select(round("averageSalary").alias("roundedAverageSalary"))

roundedAvgSalaryDF.show()

Min and max:

from pyspark.sql.functions import min, max
salaryDF = peopleDF.select(max("salary").alias("max"), min("salary").alias("min"), round(avg("salary")).alias("averageSalary"))

salaryDF.show()

Joining

Have to rename columns you are joining on.

The magic command %fs means invoke a filesystem command command (fully it is dbutils.fs.ls("file:/foobar"))

%fs ls dbfs:/mnt/training/ssn/names-1880-2016.parquet/

Count distinct names

peopleDistinctNamesDF = peopleDF.select("firstName").distinct()
peopleDistinctNamesDF.count()

Read the data

ssaDF = spark.read.parquet("/mnt/training/ssn/names-1880-2016.parquet/")
display(ssaDF)

Rename a column for joining

ssaDistinctNamesDF = ssaDF.select("firstName").withColumnRenamed("firstName",'ssaFirstName').distinct()

Count distinct names

ssaDistinctNamesDF.count()

Join the two data frames

from pyspark.sql.functions import col
joinedDF = peopleDistinctNamesDF.join(ssaDistinctNamesDF, col("firstName") == col("ssaFirstName"))
joinedDF.count()

convert all the negative salaries to positive ones, and then sort the top 20 people by their salary:

from pyspark.sql.functions import abs
peopleWithFixedSalariesDF = peopleDF.select("firstName","middleName","lastName","gender","birthDate","ssn",abs(col("salary")).alias('salary'))
peopleWithFixedSalariesSortedDF = peopleWithFixedSalariesDF.select("*").orderBy("salary").limit(20)

Filter out salaries < 20,000 and categorize salaries into $10k groups:

peopleWithFixedSalaries20KDF = peopleWithFixedSalariesDF.select("*").withColumn("salary10k", round(col("salary")/10000)).filter("salary >= 20000")
below20K = peopleWithFixedSalaries20KDF.filter("salary < 20000").count()

# to test
from pyspark.sql.functions import count
results = (peopleWithFixedSalaries20KDF 
  .select("salary10k") 
  .groupBy("salary10k") 
  .agg(count("*").alias("total")) 
  .orderBy("salary10k") 
  .limit(5) 
  .collect()
)

count the number of females named Caren who were born before March 1980

carensDF = (peopleDF 
  .filter("birthDate < '1980-03-01' ") 
  .filter("firstName = 'Caren' ") 
  .filter("gender = 'F' ") 
  .agg(count("*").alias("total"))
)

#from pyspark.sql.functions import to_timestamp
#carensDF = (peopleDF
#  .select("firstName", "gender", "birthDate")
#  .filter("firstName == 'Caren'")
#  .filter("gender == 'F'")
#  .filter("birthDate < to_timestamp('1980-03-01')")
#  .agg(count(col('firstName')).alias('total'))
#)
display(carensDF)

More info about DataFrames here and the Cost Based Optimizer here

03b: Aggregations and Joins

find the most popular first name for girls in 1885, 1915, 1945, 1975, and 2005:

ssaDF = spark.read.parquet("/mnt/training/ssn/names-1880-2016.parquet")
ssaDF.printSchema()

04: Accessing Data

The DBFS (Databricks File System) is the built-in, Azure-blob-back, alternative to HDFS (Hadoop Distributed File System).

A parquet "file" is actually a collection of files stored in a single directory.

ipGeocodeDF = spark.read.parquet("/mnt/training/ip-geocode.parquet")
ipGeocodeDF.printSchema()

Can create other file formats e.g. csv.

Take a peak at the data

%fs head /mnt/training/bikeSharing/data-001/day.csv --maxBytes=492

bikeSharingDayDF = (spark
  .read                                                # Call the read method returning a DataFrame
  .option("inferSchema","true")                        # Option to tell Spark to infer the schema
  .option("header","true")                             # Option telling Spark that the file has a header
  .csv("/mnt/training/bikeSharing/data-001/day.csv"))  # Option telling Spark where the file is

bikeSharingDayDF.show(10)

display(bikeSharingDayDF)

Download data here.

Click on the data button the left -> databricks -> Add Data -> Upload File -> browse.

Says file uploaded to /FileStore/tables/state_income-9f7c5.csv

Go back to the notebook and run:

stateIncomeDF = (spark
  .read                                                # Call the read method returning a DataFrame
  .option("inferSchema","true")                        # Option to tell Spark to infer the schema
  .option("header","true")                             # Option telling Spark that the file has a header
  .csv("/FileStore/tables/state_income-9f7c5.csv"))    # Option telling Spark where the file is

stateIncomeDF.show(10)

Microsoft Azure provides cloud file storage in the form of the Blob Store. Files are stored in "blobs".

See what's already mounted in the DBFS

%fs mounts

Azure provides you with a secure way to create and share access keys for your Azure Blob Store without compromising your account keys. See here.

You can create your own mount by going to azure portal -> Storage accounts -> Create an account -> Shared access Signature -> Generate SAS -> Copy the Blog service SAS URL -> Use the URL in the mount operation.

Create a mount called /mnt/temp-training by running:

SasURL = "https://dbtraineastus2.blob.core.windows.net/?sv=2017-07-29&ss=b&srt=sco&sp=rl&se=2023-04-19T06:32:30Z&st=2018-04-18T22:32:30Z&spr=https&sig=BB%2FQzc0XHAH%2FarDQhKcpu49feb7llv3ZjnfViuI9IWo%3D"
indQuestionMark = SasURL.index('?')
SasKey = SasURL[indQuestionMark:len(SasURL)]
StorageAccount = "dbtraineastus2"
ContainerName = "training"
MountPoint = "/mnt/temp-training"

dbutils.fs.mount(
  source = "wasbs://%s@%s.blob.core.windows.net/" % (ContainerName, StorageAccount),
  mount_point = MountPoint,
  extra_configs = {"fs.azure.sas.%s.%s.blob.core.windows.net" % (ContainerName, StorageAccount) : "%s" % SasKey}
)

05: Querying-JSON

JSON is a common file format used in big data applications and in data lakes. It can handle:

  • If the schema or structure changes over time.
  • You need nested files like an array or an array of arrays.
  • You don't know how you're going to use your data yet.

The data is made up of JSON files. There is a single JSON object on each like of the file; each object corresponds to a row in the table. Each row represents a blog post in the Databricks blog.

databricksBlogDF = spark.read.option("inferSchema","true").option("header","true").json("/mnt/training/databricks-blog.json")

databricksBlogDF.printSchema()

display(databricksBlogDF.select("authors","categories","dates","content"))

This of nested data as columns within columns.

datesDF = databricksBlogDF.select("dates")
display(datesDF)

Pull out a specific subfield with . (object) notation.

display(databricksBlogDF.select("dates.createdOn", "dates.publishedOn"))

Create a DataFrame, databricksBlog2DF that contains the original columns plus the new publishedOn column obtained from flattening the dates column.

from pyspark.sql.functions import col
databricksBlog2DF = databricksBlogDF.withColumn("publishedOn",col("dates.publishedOn"))

databricksBlog2DF.printSchema()

Convert the string to time stamps. Cast dates.publishedOn to a timestamp data type. "Flatten" the dates.publishedOn column to just publishedOn

from pyspark.sql.functions import date_format
display(databricksBlogDF.select("title",date_format("dates.publishedOn","yyyy-MM-dd").alias("publishedOn")))
# databricksBlogDF.select("title",date_format("dates.publishedOn","yyyy-MM-dd").alias("publishedOn"),"link")
    .filter(year(col("publishedOn")) == 2013)
    .orderBy(col("publishedOn"))

Create another DataFrame, databricksBlog2DF that contains the original columns plus the new publishedOn column obtained from flattening the dates column.

databricksBlog2DF = databricksBlogDF.withColumn("publishedOn", date_format("dates.publishedOn","yyyy-MM-dd")) 
display(databricksBlog2DF)

databricksBlog2DF.printSchema()

Obtain only if it was published in 2013

from pyspark.sql.functions import to_date, year, col
          
resultDF = (databricksBlog2DF.select("title", to_date(col("publishedOn"), "yyyy-MM-dd").alias('date'), "link") 
  .filter(year(col("publishedOn")) == '2013') 
  .orderBy(col("publishedOn"))
)

display(resultDF)

Array columns. Can use size(...) on the array

from pyspark.sql.functions import size
display(databricksBlogDF.select(size("authors"),"authors")) # Size of each row. e.g. may be two authors for a blog post.

Obtain the 0th element of the array

display(databricksBlogDF.select(col("authors")[0].alias("primaryAuthor"))) # Get first author of each row.

The explode method allows you to split an array column into multiple rows. The extends the size of the original column and provides duplicates for other columns.

from pyspark.sql.functions import explode
display(databricksBlogDF.select("title","authors",explode(col("authors")).alias("author"), "link"))

restrict the output to articles that have multiple authors, and then sort by the title:

databricksBlog2DF = (databricksBlogDF 
  .select("title","authors",explode(col("authors")).alias("author"), "link") 
  .filter(size(col("authors")) > 1) 
  .orderBy("title")
)

display(databricksBlog2DF)

Find the blogs written by Michael Armbrust:

tmpDF = (databricksBlogDF
  .select("title","authors",explode(col("authors")).alias("author"), "link",
          to_date(col("dates.publishedOn"), "yyyy-MM-dd").alias('date'))
  .filter(col("author") == 'Michael Armbrust')
  .orderBy(col("date"))                    
)
articlesByMichaelDF = tmpDF.select("title")

You can use array_contains to which gives a boolean list of if a row contains the value or not:

tmpDF2 = databricksBlogDF.select(array_contains(col("authors"), "Michael Armbrust"))

Grab the unique categories:

uniqueCategoriesDF = databricksBlogDF.select(explode(col("categories")).alias("category")).distinct().orderBy("category")

Count each time a category appears:

from pyspark.sql.functions import count
totalArticlesByCategoryDF = (databricksBlogDF
  .select(explode(col("categories")).alias("category"))
  .groupBy("category")
  .agg(count("category").alias("total"))
  .orderBy("category")
)
#display(totalArticlesByCategoryDF)
#from pyspark.sql.functions import count
#totalArticlesByCategoryDF = (databricksBlogDF 
#  .select(explode(col("categories")).alias("category")) 
#  .groupBy("category") 
#  .agg(count("*").alias("total")) 
#  .orderBy("category") 
#)

See more about Spark SQL here

06: Querying Data Lakes with DataFrames

A data lake is:

  • Storage repository that cheaply stores a vast amount of raw data in its native format.
  • Consists of current and historical data dumps in various formats including XML, JSON, CSV, Parquet, etc.
  • May contain operational relational databases with live transnational data.

List the crime data

%fs ls /mnt/training/crime-data-2016

create a DataFrame for each file (these can be difference columns and are different styles e.g. caps and lower case)

crimeDataNewYorkDF = spark.read.parquet("/mnt/training/crime-data-2016/Crime-Data-New-York-2016.parquet")

crimeDataBostonDF = spark.read.parquet("/mnt/training/crime-data-2016/Crime-Data-Boston-2016.parquet")

Anyone can contribute information to the Data Lake and that Data Lakes scale to store arbitrarily large and diverse data.

from pyspark.sql.functions import lower, upper, month, col

# | is or
homicidesNewYorkDF = (crimeDataNewYorkDF 
  .select(month(col("reportDate")).alias("month"), col("offenseDescription").alias("offense")) 
  .filter(lower(col("offenseDescription")).contains("murder") | lower(col("offenseDescription")).contains("homicide"))
)

display(homicidesNewYorkDF)

homicidesBostonDF = (crimeDataBostonDF 
  .select("month", col("OFFENSE_CODE_GROUP").alias("offense")) 
  .filter(lower(col("OFFENSE_CODE_GROUP")).contains("homicide"))
)

display(homicidesBostonDF)

The structure is now identical

display(homicidesNewYorkDF.limit(5))
display(homicidesBostonDF.limit(5))

Combine them by using a union

homicidesBostonAndNewYorkDF = homicidesNewYorkDF.union(homicidesBostonDF)

see the number of homicides per month:

display(homicidesBostonAndNewYorkDF.select("month").orderBy("month").groupBy("month").count())

Do the same for Chicago

crimeDataChicagoDF = spark.read.parquet("/mnt/training/crime-data-2016/Crime-Data-Chicago-2016.parquet")

display(crimeDataChicagoDF.select("primaryType").distinct())

homicidesChicagoDF = (crimeDataChicagoDF
  .select(month(col("date")).alias("month"), col("primaryType").alias("offense"))
  .filter(lower(col("primaryType")).contains("murder") |
          lower(col("primaryType")).contains("homicide"))
)

allHomicidesDF = homicidesNewYorkDF.union(homicidesBostonDF).union(homicidesChicagoDF)

homicidesByMonthDF  = allHomicidesDF.select("month").orderBy("month").groupBy("month").count()
#homicidesByMonthDF = allHomicidesDF.select("month").groupBy("month").agg(count('*').alias("homicides")).orderBy("month")

07: Capstone Project

crimeDataLosAngelesDF = spark.read.parquet("/mnt/training/crime-data-2016/Crime-Data-Los-Angeles-2016.parquet")

crimeDataPhiladelphiaDF = spark.read.parquet("/mnt/training/crime-data-2016/Crime-Data-Philadelphia-2016.parquet")

crimeDataDallasDF = spark.read.parquet("/mnt/training/crime-data-2016/Crime-Data-Dallas-2016.parquet")

Find how many robberies occurred in each city:

from pyspark.sql.functions import col, lower
robberyLosAngelesDF = (crimeDataLosAngelesDF.select(lower(col("crimeCodeDescription")))
    .filter("crimeCodeDescription = 'ROBBERY'")
)
display(robberyLosAngelesDF)

robberyPhiladelphiaDF  = (crimeDataPhiladelphiaDF.select(lower(col("ucr_general_description")))
    .filter("ucr_general_description = 'ROBBERY'")
)
display(robberyPhiladelphiaDF)

robberyDallasDF = (crimeDataDallasDF.select(lower(col("typeOfIncident")))
    .filter(lower(col("typeOfIncident")).contains("robbery "))
)
# robberyDallasDF  = crimeDataDallasDF.filter(lower(col("typeOfIncident")).startswith("robbery"))
display(robberyDallasDF)

Sum the number of robberies per month.

from pyspark.sql.functions import month, col, count
robberiesByMonthLosAngelesDF = (crimeDataLosAngelesDF.select(month(col("timeOccurred")).alias("month"),
                                                             lower(col("crimeCodeDescription")).alias("crime"))
  .filter("crimeCodeDescription = 'ROBBERY'")
  .orderBy("month")
  .groupBy("month")
  .agg(count(col('crime')).alias("robberies"))
)
display(robberiesByMonthLosAngelesDF)
#robberiesByMonthLosAngelesDF = (robberyLosAngelesDF 
#  .select(month(col("timeOccurred")).alias("month")) 
#  .groupBy("month") 
#  .count().withColumnRenamed("count", "robberies") 
#  .orderBy("month")
#)

robberiesByMonthPhiladelphiaDF = (crimeDataPhiladelphiaDF.select(month(col("dispatch_date_time")).alias("month"),
                                                             lower(col("ucr_general_description")).alias("crime"))
  .filter("ucr_general_description = 'ROBBERY'")
  .orderBy("month")
  .groupBy("month")
  .agg(count(col('crime')).alias("robberies"))
)
display(robberiesByMonthPhiladelphiaDF)

robberiesByMonthDallasDF = (crimeDataDallasDF.select(month(col("startingDateTime")).alias("month"),
                                                             lower(col("typeOfIncident")).alias("crime"))
  .filter(lower(col("typeOfIncident")).contains("robbery "))
  .orderBy("month")
  .groupBy("month")
  .agg(count(col('crime')).alias("robberies"))
)
display(robberiesByMonthDallasDF)

Combines all three robberies-per-month views into one. In creating this view, add a new column called city

from pyspark.sql.functions import lit

combinedRobberiesByMonthDF = (robberiesByMonthLosAngelesDF.withColumn("city", lit("Los Angeles"))
  .union(robberiesByMonthPhiladelphiaDF.withColumn("city", lit("Philadelphia")))
  .union(robberiesByMonthDallasDF.withColumn("city", lit("Dallas")))                              
)
display(combinedRobberiesByMonthDF)

Join the data with city population size to normalize the data

cityDataDF = spark.read.parquet("dbfs:/mnt/training/City-Data.parquet").withColumnRenamed("city", "cities")
display(cityDataDF)
tmpDF = combinedRobberiesByMonthDF.join(cityDataDF, col("city") == col("cities"))
tmpDF2 = tmpDF.select("*").withColumn("robberyRate", col("robberies")/col("estPopulation2016"))
robberyRatesByCityDF = tmpDF2.select("city", "month", "robberyRate")
display(robberyRatesByCityDF)

#robberyRatesByCityDF = (combinedRobberiesByMonthDF 
#  .select("month", "robberies", "city") 
#  .join(cityDataDF, combinedRobberiesByMonthDF.city == cityDataDF.cities) 
#  .withColumn("robberyRate", col("robberies")/col("estPopulation2016"))
#)

Includes

Stream generator

class DummyDataGenerator:
  streamDirectory = "dbfs:/tmp/{}/new-flights".format(username)

None # suppress output
%scala

import scala.util.Random
import java.io._
import java.time._

// Notebook #2 has to set this to 8, we are setting
// it to 200 to "restore" the default behavior.
spark.conf.set("spark.sql.shuffle.partitions", 200)

// Make the username available to all other languages.
// "WARNING: use of the "current" username is unpredictable
// when multiple users are collaborating and should be replaced
// with the notebook ID instead.
val username = com.databricks.logging.AttributionContext.current.tags(com.databricks.logging.BaseTagDefinitions.TAG_USER);
spark.conf.set("com.databricks.training.username", username)

object DummyDataGenerator extends Runnable {
  var runner : Thread = null;
  val className = getClass().getName()
  val streamDirectory = s"dbfs:/tmp/$username/new-flights"
  val airlines = Array( ("American", 0.15), ("Delta", 0.17), ("Frontier", 0.19), ("Hawaiian", 0.21), ("JetBlue", 0.25), ("United", 0.30) )

  val rand = new Random(System.currentTimeMillis())
  var maxDuration = 3 * 60 * 1000 // default to a couple of minutes

  def clean() {
    System.out.println("Removing old files for dummy data generator.")
    dbutils.fs.rm(streamDirectory, true)
    if (dbutils.fs.mkdirs(streamDirectory) == false) {
      throw new RuntimeException("Unable to create temp directory.")
    }
  }

  def run() {
    val date = LocalDate.now()
    val start = System.currentTimeMillis()

    while (System.currentTimeMillis() - start < maxDuration) {
      try {
        val dir = s"/dbfs/tmp/$username/new-flights"
        val tempFile = File.createTempFile("flights-", "", new File(dir)).getAbsolutePath()+".csv"
        val writer = new PrintWriter(tempFile)

        for (airline <- airlines) {
          val flightNumber = rand.nextInt(1000)+1000
          val departureTime = LocalDateTime.now().plusHours(-7)
          val (name, odds) = airline
          val test = rand.nextDouble()

          val delay = if (test < odds)
            rand.nextInt(60)+(30*odds)
            else rand.nextInt(10)-5

          println(s"- Flight #$flightNumber by $name at $departureTime delayed $delay minutes")
          writer.println(s""" "$flightNumber","$departureTime","$delay","$name" """.trim)
        }
        writer.close()

        // wait a couple of seconds
        Thread.sleep(rand.nextInt(5000))

      } catch {
        case e: Exception => {
          printf("* Processing failure: %s%n", e.getMessage())
          return;
        }
      }
    }
    println("No more flights!")
  }

  def start(minutes:Int = 5) {
    maxDuration = minutes * 60 * 1000

    if (runner != null) {
      println("Stopping dummy data generator.")
      runner.interrupt();
      runner.join();
    }
    println(s"Running dummy data generator for $minutes minutes.")
    runner = new Thread(this);
    runner.start();
  }

  def stop() {
    start(0)
  }
}

DummyDataGenerator.clean()

displayHTML("Imported streaming logic...") // suppress output