Spark
Installation
Install on windows
Download https://www.apache.org/dyn/closer.lua/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
Move it to Documents, and untar it using Anaconda prompt
cd Documents
tar zxvf spark-3.0.0-bin-hadoop3.2.tgz
Create an environmental variable called SPARK_HOME using the environmental variables tool and point to to ...\spark-3.0.0-bin-hadoop3.2
Install hadoop winutils from https://github.com/cdarlint/winutils/tree/master/hadoop-3.2.0/bin ; Download and move to documents and unzip. Create a variable called HADOOP_HOME and point to this folder
conda create -n spark python=3.7
conda activate spark
conda install -c conda-forge findspark pyspark jupyter
python
Create a session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('App').getOrCreate()
sc = spark.sparkContext
sc.getConf().getAll()
df = spark.createDataFrame([(2,), (5,), (5,)], ('age',))
Download Oracle driver
https://www.oracle.com/database/technologies/jdbcdriver-ucp-downloads.html
Create an environmental variable called CLASSPATH as .../ojdbc6.jar
https://download.oracle.com/otn_software/nt/instantclient/19600/instantclient-jdbc-windows.x64-19.6.0.0.0dbru.zip
You can also specify the jar when starting pyspark
pyspark --jars orai18n.jar
Configuration
Use arrow to convert to pandas much quicker
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df.toPandas()
IO
Read
Read parquet (with schema)
df = spark.read.parquet("/DIRECTORY/*")
schema = StructType([StructField('COL_NUM', IntegerType(), True), StructField('COL_STRING', StringType(), True)])
spark.read.parquet("/DIRECTORY/*", schema=schema)
Register a parquet file as a table
%sql
CREATE TABLE IF NOT EXISTS TABLE_NAME
USING parquet
OPTIONS (
path "/mnt/FOLDER/FILE.parquet"
)
Register a csv file as an unmanaged table
spark.sql("DROP DATABASE IF EXISTS learn_spark_db CASCADE")
spark.sql("CREATE DATABASE learn_spark_db")
spark.sql("USE learn_spark_db")
spark.sql("CREATE TABLE us_delay_flights_tbl (date STRING, delay INT, distance INT, origin STRING, destination STRING) USING csv OPTIONS (path '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')")
Register a DataFrame as a sql table (view)
df.registerTempTable("TABLE_NAME")
df.createOrReplaceTempView("TABLE_NAME")
Read Table
df = spark.sql("SELECT * FROM DATABASE.TABLE")
Write
Save as Table (parquet format) (with bucketing)
df.write.saveAsTable("DATABASE.TABLE")
df.write.format("parquet").mode("overwrite").option("path", "DIRECTORY/FILE.parquet").saveAsTable("DATABASE.TABLE")
df.write.format("parquet").bucketBy(8, "UUID").mode("overwrite").saveAsTable("DATABASE.TABLE")
Save as Delta Table (append and merge schema)
df.write.format("delta").save("DIRECTORY")
df.write.format("delta").mode("append").option("mergeSchema", "true").save("DIRECTORY")
Save as parquet (overwrite with snappy compression) (save as a single partition and rename it)
df.write.parquet("DIRECTORY/FILE.parquet")
df.write.mode("overwrite").option("compression", "snappy").parquet("DIRECTORY/FILE.parquet")
df.repartition(1).write.mode("overwrite").parquet("DIRECTORY/FILE.parquet")
mv DIRECTORY/FILE.parquet/*.parquet DIRECTORY/FILE.parquet/FILE.parquet
Save as csv
df.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").options(header="true").save("/mnt/FOLDER/FILE.csv")
SQL
import pyspark.sql.functions as F
Time functions
YEAR(DATE_COL)
QUARTER(DATE_COL)
MONTH(DATE_COL)
DATE_SUB(NEXT_DAY(current_date(), 'FR'), 7) -- Date of last Friday
Count distinct
select count(distinct COL) from TABLE
Utils
List databases
spark.catalog.listDatabases()
List tables in a database
spark.catalog.listTables(dbName="DATABASE")
List columns of a table
spark.catalog.listColumns("TABLE")