Spark

Installation

Install on windows

Download https://www.apache.org/dyn/closer.lua/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

Move it to Documents, and untar it using Anaconda prompt

cd Documents

tar zxvf spark-3.0.0-bin-hadoop3.2.tgz

Create an environmental variable called SPARK_HOME using the environmental variables tool and point to to ...\spark-3.0.0-bin-hadoop3.2

Install hadoop winutils from https://github.com/cdarlint/winutils/tree/master/hadoop-3.2.0/bin ; Download and move to documents and unzip. Create a variable called HADOOP_HOME and point to this folder

conda create -n spark python=3.7

conda activate spark

conda install -c conda-forge findspark pyspark jupyter

python

Create a session

import findspark

findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('App').getOrCreate()

sc = spark.sparkContext

sc.getConf().getAll()

df = spark.createDataFrame([(2,), (5,), (5,)], ('age',))

Download Oracle driver

https://www.oracle.com/database/technologies/jdbcdriver-ucp-downloads.html

Create an environmental variable called CLASSPATH as .../ojdbc6.jar

https://download.oracle.com/otn_software/nt/instantclient/19600/instantclient-jdbc-windows.x64-19.6.0.0.0dbru.zip

You can also specify the jar when starting pyspark

pyspark --jars orai18n.jar

Configuration

Use arrow to convert to pandas much quicker

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df.toPandas()

IO

Read

Read parquet (with schema)

df = spark.read.parquet("/DIRECTORY/*")


schema = StructType([StructField('COL_NUM', IntegerType(), True), StructField('COL_STRING', StringType(), True)])

spark.read.parquet("/DIRECTORY/*", schema=schema)

Register a parquet file as a table

%sql

CREATE TABLE IF NOT EXISTS TABLE_NAME

USING parquet

OPTIONS (

path "/mnt/FOLDER/FILE.parquet"

)

Register a csv file as an unmanaged table

spark.sql("DROP DATABASE IF EXISTS learn_spark_db CASCADE")

spark.sql("CREATE DATABASE learn_spark_db")

spark.sql("USE learn_spark_db")

spark.sql("CREATE TABLE us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING) USING csv OPTIONS (path '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')")

Register a DataFrame as a sql table (view)

df.registerTempTable("TABLE_NAME")

df.createOrReplaceTempView("TABLE_NAME")

Read Table

df = spark.sql("SELECT * FROM DATABASE.TABLE")

Write

Save as Table (parquet format) (with bucketing)

df.write.saveAsTable("DATABASE.TABLE")


df.write.format("parquet").mode("overwrite").option("path", "DIRECTORY/FILE.parquet").saveAsTable("DATABASE.TABLE")


df.write.format("parquet").bucketBy(8, "UUID").mode("overwrite").saveAsTable("DATABASE.TABLE")

Save as Delta Table (append and merge schema)

df.write.format("delta").save("DIRECTORY")


df.write.format("delta").mode("append").option("mergeSchema", "true").save("DIRECTORY")

Save as parquet (overwrite with snappy compression) (save as a single partition and rename it)

df.write.parquet("DIRECTORY/FILE.parquet")


df.write.mode("overwrite").option("compression", "snappy").parquet("DIRECTORY/FILE.parquet")


df.repartition(1).write.mode("overwrite").parquet("DIRECTORY/FILE.parquet")

mv DIRECTORY/FILE.parquet/*.parquet DIRECTORY/FILE.parquet/FILE.parquet

Save as csv

df.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").options(header="true").save("/mnt/FOLDER/FILE.csv")

SQL

import pyspark.sql.functions as F

Time functions

YEAR(DATE_COL)

QUARTER(DATE_COL)

MONTH(DATE_COL)

DATE_SUB(NEXT_DAY(current_date(), 'FR'), 7) -- Date of last Friday

Count distinct

select count(distinct COL) from TABLE

Utils

List databases

spark.catalog.listDatabases()

List tables in a database

spark.catalog.listTables(dbName="DATABASE")

List columns of a table

spark.catalog.listColumns("TABLE")