Pyspark
https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf
https://pages.databricks.com/rs/094-YMS-629/images/LearningSpark2.0.pdf
IO
Query Oracle
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession \
.builder \
.appName("App") \
.config('spark.driver.extraClassPath', 'C:\Oracle\instantclient_19_5\ojdbc8.jar') \
.config('spark.executor.extraClassPath', 'C:\Oracle\instantclient_19_5\ojdbc8.jar') \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
conn_string = 'jdbc:oracle:thin:USERNAME/PASSWORD@//HOST:PORT/SERVICENAME'
query = """
select * FROM TABLE
"""
df = sqlContext.read.format("jdbc") \
.options(url=conn_string,
driver="oracle.jdbc.driver.OracleDriver",
dbtable="({})".format(query)) \
# dbtable="SCHEMA.TABLE"
.load()
Convert to a pandas DataFrame
# spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df.toPandas()
Dtypes
http://spark.apache.org/docs/3.0.0/api/java/org/apache/spark/sql/types/package-summary.html
See schema (dtypes)
df.printSchema()
df.schema
df.dtypes
Convert a decimal type to double
import pyspark.sql.types as T
df.withColumn("COL", F.col("COL").cast(T.DoubleType()))
Cast all Int's to Double's
intColumns = [x.name for x in df.schema.fields if x.dataType == T.IntegerType()]
df2 = df
for c in intColumns:
df2= df2.withColumn(c, F.col(c).cast("double"))
Datetime
Convert to a datetime
df = df.withColumn("COL", F.col("COL").cast("timestamp"))
df.withColumn("Time", F.unix_timestamp("Time", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp")
df.withColumn("Time", F.date_format("Time", "yyyy-MM-dd")
df.withColumn("NewTime", to_timestamp(col("Time"), "MM/dd/yyyy")).drop("Time")
df.withColumn("NewTime", to_timestamp(col("Time"), "MM/dd/yyyy hh:mm:ss a")).drop("Time")
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
df.withColumn("Date", F.date_format("Date", "MM/dd/yyyy")
Parse MMDDHHSS
def to_date_format_udf(d_str):
l = [char for char in d_str]
return "".join(l[0:2]) + "/" + "".join(l[2:4]) + " " + " " +"".join(l[4:6]) + ":" + "".join(l[6:])
spark.udf.register("to_date_format_udf", to_date_format_udf, StringType())
df.selectExpr("to_date_format_udf(DATE) as DATE_FORMATTED")
Filter using current date
df.filter(F.col("DATE") < F.current_date())
Add days
df.withColumn("COL", F.date_add(F.col("DATE"), 7))
Day of week
df.withColumn("COL", F.dayofweek(F.col("DATE"))) # 1 = Sunday, 2 = Monday, ...
Querying
Select columns (distinct)
df.select("COL1", "COL2")
df.select("COL1").distinct()
Datetime
df.filter(F.col("FIELD") > 'YYYY-MM-DD')
Is in
df_recent.filter(col("FIELD").isin([1, 2]))
Is not in
df_recent.filter(~col("FIELD").isin([1, 2]))
Or
df.filter((F.col("firstName") == 'Donna') | (F.col("firstName") == 'Dorothy'))
Where
df.where(F.col("firstName") != "Donna")
Case statement
df= df.withColumn("QUARTILE",
F.when(F.col("PERCENTILE") < 0.25, '0-0.25')
.when((F.col("PERCENTILE") >= 0.25) & (F.col("PERCENTILE") < 0.5), '0.25-0.5')
.when((F.col("PERCENTILE") >= 0.5) & (F.col("PERCENTILE") < 0.75), '0.5-0.75')
.otherwise('0.75-1'))
Case statement as an expression
df.withColumn("status", F.expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END"))
Using a expanded SQL expression
df.filter(F.expr("""origin == 'SEA' AND destination == 'SFO' AND date like '01010%' AND delay > 0"""))
Dropping Nulls
df.dropna(how='any', subset=["COL"])
Dropping 0's
df.filter(F.col("COL") != 0)
Joining
df.join(df2, F.col("firstName") == F.col("ssaFirstName"))
Where not in
df.join(df2, on='COL', how='left_anti')
Group by
Multiple aggregations
groupby_cols = ["COL1", "COL2"]
cols = ['COL3', 'COL4']
agg_expr_count = [F.count(col).alias(col+"_count") for col in cols]
agg_expr_sum = [F.sum(col).alias(col+"_sum") for col in sum_cols]
agg_expr = [*agg_expr_count, *agg_expr_sum]
df.groupby(groupby_cols).agg(*agg_expr)
Count
df.select("COL"1, "COL2").groupBy("COL1").agg({"COL2": "count"})
Sum
df.groupby("COL1").agg(F.sum("COL2").alias("COL2_SUM"))
df.select("COL"1, "COL2").groupBy("COL1").max()#.alias("COL1")
df.select("COL"1, "COL2").groupBy("COL1").max()#.as("COL1")
Window
Cumulative sum
df.withColumn(
"CUMSUM",
F.expr(
"SUM(COL) OVER (PARTITION BY COL2 ORDER BY COL3 DESC)"
),
)
Forward fill 0's
def ffill_cumsum(df, col=None, parition_col=None, order_col=None):
window = Window.partitionBy(parition_col).orderBy(order_col).rowsBetween(-1000000000, 0)
return df.withColumn(col, F.max(df[col]).over(window))
Analytics
Shape of a DataFrame
print((df.count(), len(df.columns)))
Statistical overview of a table
df.describe()
Max of a column as a dataframe (value)
df.agg({'COL': 'max'}).show()
df.agg({'COL': 'max'}).collect()[0]["max(COL)"]
Add a column as a sum of other columns
import pyspark.sql.functions as F
df.withColumn("NEW_COL", F.col("COL1") + F.col("COL2"))
Map a column
from itertools import chain
mapping = {'A':1, 'B':2}
mapping_expr = F.create_map([F.lit(x) for x in chain(*mapping.items())])
df.withColumn("NEW_COL", mapping_expr.getItem(F.col("COL")))
Remove $ from a string and cast to double
df.withColumn("price", F.translate(F.col("price"), "$,", "").cast("double"))
Update values in a column
df.withColumn("COL_UPDATED", F.when(df["COL"] > 1, 1).otherwise(df["COL"]))
Replace values in a dataframe e.g. 10 -> 20
df4.replace(10, 20)
Fill null values e.g. -> 0 for COL
df.fillna(0, subset=["COL"])
Rename a column
df.withColumnRenamed("COL", "NEW_COL")
Drop a column
df.drop(*["COL1", "COL2"])
Correlation between columns
for col in df.columns:
correlation = df.stat.corr("COL", col)
UDF
Spark UDF
Plus one
df = (spark
.range(0, 10 * 1000 * 1000)
.withColumn('id', (col('id') / 1000).cast('integer')) # 10,000 id's
.withColumn('v', rand())) # 10,000,000 points
df.cache()
df.count()
@udf("double")
def plus_one(v):
return v + 1
df.withColumn('v', plus_one(df.v))
PySpark Cubing in SQL namespace
def cubed(s):
return s * s * s
spark.udf.register("cubed", cubed, LongType())
spark.range(1, 9).createOrReplaceTempView("udf_test")
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()
Add one to an array
arrayData = [[1, (1, 2, 3)], [2, (2, 3, 4)], [3, (3, 4, 5)]]
arraySchema = (T.StructType([
T.StructField("id", T.IntegerType(), True),
T.StructField("values", T.ArrayType(IntegerType()), True)
]))
df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema)
df.createOrReplaceTempView("table")
def addOne(values):
return [value + 1 for value in values]
spark.udf.register("plusOneIntPy", addOne, ArrayType(IntegerType()))
spark.sql("SELECT id, plusOneIntPy(values) AS values FROM table").show()
Transform Celsius to Fahrenheit
schema = T.StructType([T.StructField("celsius", T.ArrayType(T.IntegerType()))])
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")
spark.sql("""SELECT celsius, transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit FROM tC""").show()
Pandas UDF
Pandas Cubing
from pyspark.sql.functions pandas_udf
def cubed(a: pd.Series) -> pd.Series:
return a * a * a
cubed_udf = pandas_udf(cubed, returnType=T.LongType())
df = spark.range(1, 4)
df.select("id", cubed_udf(col("id"))).show()
Vectorize plus one
@pandas_udf('double')
def vectorized_plus_one(v):
return v + 1
df.withColumn('v', vectorized_plus_one(df.v))
Subtract mean
from pyspark.sql.functions import PandasUDFType
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def vectorized_subtract_mean(pdf):
return pdf.assign(v=pdf.v - pdf.v.mean())
df.groupby('id').apply(vectorized_subtract_mean)
Caching
See task graph
df.explain()
Cache
df.cache().count()
Persist (un persist)
df.persist(StorageLevel.DISK_ONLY).count()
df.unpersist()
See number of partitions
df.rdd.getNumPartitions()
Cache a table
%sql
CACHE TABLE DATABASE.TABLE
Machine Learning
Feature engineering
Drop null for a categorical field
df.na.drop(subset=["STR_COL"])
Impute missing values with median
imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols)
df2= imputer.fit(df).transform(df)
Train-test split
trainDf, testDf = df.randomSplit([.8, .2], seed=42)
Evaluation
Regression metrics
from pyspark.ml.evaluation import RegressionMetrics
Visualization
Show the dataframe
display(df)