Pyspark

https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf

https://pages.databricks.com/rs/094-YMS-629/images/LearningSpark2.0.pdf

IO

Query Oracle

from pyspark.sql import SparkSession, SQLContext


spark = SparkSession \

.builder \

.appName("App") \

.config('spark.driver.extraClassPath', 'C:\Oracle\instantclient_19_5\ojdbc8.jar') \

.config('spark.executor.extraClassPath', 'C:\Oracle\instantclient_19_5\ojdbc8.jar') \

.getOrCreate()


sc = spark.sparkContext

sqlContext = SQLContext(sc)


conn_string = 'jdbc:oracle:thin:USERNAME/PASSWORD@//HOST:PORT/SERVICENAME'


query = """

select * FROM TABLE

"""


df = sqlContext.read.format("jdbc") \

.options(url=conn_string,

driver="oracle.jdbc.driver.OracleDriver",

dbtable="({})".format(query)) \

# dbtable="SCHEMA.TABLE"

.load()

Convert to a pandas DataFrame

# spark.conf.set("spark.sql.execution.arrow.enabled", "true")

df.toPandas()

Dtypes

http://spark.apache.org/docs/3.0.0/api/java/org/apache/spark/sql/types/package-summary.html

See schema (dtypes)

df.printSchema()

df.schema

df.dtypes

Convert a decimal type to double

import pyspark.sql.types as T

df.withColumn("COL", F.col("COL").cast(T.DoubleType()))

Cast all Int's to Double's

intColumns = [x.name for x in df.schema.fields if x.dataType == T.IntegerType()]

df2 = df

for c in intColumns:

df2= df2.withColumn(c, F.col(c).cast("double"))

Datetime

Convert to a datetime

df = df.withColumn("COL", F.col("COL").cast("timestamp"))


df.withColumn("Time", F.unix_timestamp("Time", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp")


df.withColumn("Time", F.date_format("Time", "yyyy-MM-dd")


df.withColumn("NewTime", to_timestamp(col("Time"), "MM/dd/yyyy")).drop("Time")


df.withColumn("NewTime", to_timestamp(col("Time"), "MM/dd/yyyy hh:mm:ss a")).drop("Time")


spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

df.withColumn("Date", F.date_format("Date", "MM/dd/yyyy")

Parse MMDDHHSS

def to_date_format_udf(d_str):

l = [char for char in d_str]

return "".join(l[0:2]) + "/" + "".join(l[2:4]) + " " + " " +"".join(l[4:6]) + ":" + "".join(l[6:])


spark.udf.register("to_date_format_udf", to_date_format_udf, StringType())


df.selectExpr("to_date_format_udf(DATE) as DATE_FORMATTED")

Filter using current date

df.filter(F.col("DATE") < F.current_date())

Add days

df.withColumn("COL", F.date_add(F.col("DATE"), 7))

Day of week

df.withColumn("COL", F.dayofweek(F.col("DATE"))) # 1 = Sunday, 2 = Monday, ...

Querying

Select columns (distinct)

df.select("COL1", "COL2")

df.select("COL1").distinct()

Datetime

df.filter(F.col("FIELD") > 'YYYY-MM-DD')

Is in

df_recent.filter(col("FIELD").isin([1, 2]))

Is not in

df_recent.filter(~col("FIELD").isin([1, 2]))

Or

df.filter((F.col("firstName") == 'Donna') | (F.col("firstName") == 'Dorothy'))

Where

df.where(F.col("firstName") != "Donna")

Case statement

df= df.withColumn("QUARTILE",

F.when(F.col("PERCENTILE") < 0.25, '0-0.25')

.when((F.col("PERCENTILE") >= 0.25) & (F.col("PERCENTILE") < 0.5), '0.25-0.5')

.when((F.col("PERCENTILE") >= 0.5) & (F.col("PERCENTILE") < 0.75), '0.5-0.75')

.otherwise('0.75-1'))

Case statement as an expression

df.withColumn("status", F.expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END"))

Using a expanded SQL expression

df.filter(F.expr("""origin == 'SEA' AND destination == 'SFO' AND date like '01010%' AND delay > 0"""))

Dropping Nulls

df.dropna(how='any', subset=["COL"])

Dropping 0's

df.filter(F.col("COL") != 0)

Joining

df.join(df2, F.col("firstName") == F.col("ssaFirstName"))

Where not in

df.join(df2, on='COL', how='left_anti')

Group by

Multiple aggregations

groupby_cols = ["COL1", "COL2"]

cols = ['COL3', 'COL4']

agg_expr_count = [F.count(col).alias(col+"_count") for col in cols]

agg_expr_sum = [F.sum(col).alias(col+"_sum") for col in sum_cols]

agg_expr = [*agg_expr_count, *agg_expr_sum]

df.groupby(groupby_cols).agg(*agg_expr)

Count

df.select("COL"1, "COL2").groupBy("COL1").agg({"COL2": "count"})

Sum

df.groupby("COL1").agg(F.sum("COL2").alias("COL2_SUM"))

df.select("COL"1, "COL2").groupBy("COL1").max()#.alias("COL1")

df.select("COL"1, "COL2").groupBy("COL1").max()#.as("COL1")

Window

Cumulative sum

df.withColumn(

"CUMSUM",

F.expr(

"SUM(COL) OVER (PARTITION BY COL2 ORDER BY COL3 DESC)"

),

)

Forward fill 0's

def ffill_cumsum(df, col=None, parition_col=None, order_col=None):

window = Window.partitionBy(parition_col).orderBy(order_col).rowsBetween(-1000000000, 0)

return df.withColumn(col, F.max(df[col]).over(window))

Analytics

Shape of a DataFrame

print((df.count(), len(df.columns)))

Statistical overview of a table

df.describe()

Max of a column as a dataframe (value)

df.agg({'COL': 'max'}).show()

df.agg({'COL': 'max'}).collect()[0]["max(COL)"]

Add a column as a sum of other columns

import pyspark.sql.functions as F

df.withColumn("NEW_COL", F.col("COL1") + F.col("COL2"))

Map a column

from itertools import chain

mapping = {'A':1, 'B':2}

mapping_expr = F.create_map([F.lit(x) for x in chain(*mapping.items())])

df.withColumn("NEW_COL", mapping_expr.getItem(F.col("COL")))

Remove $ from a string and cast to double

df.withColumn("price", F.translate(F.col("price"), "$,", "").cast("double"))

Update values in a column

df.withColumn("COL_UPDATED", F.when(df["COL"] > 1, 1).otherwise(df["COL"]))

Replace values in a dataframe e.g. 10 -> 20

df4.replace(10, 20)

Fill null values e.g. -> 0 for COL

df.fillna(0, subset=["COL"])

Rename a column

df.withColumnRenamed("COL", "NEW_COL")

Drop a column

df.drop(*["COL1", "COL2"])

Correlation between columns

for col in df.columns:

correlation = df.stat.corr("COL", col)

UDF

Spark UDF

Plus one

df = (spark

.range(0, 10 * 1000 * 1000)

.withColumn('id', (col('id') / 1000).cast('integer')) # 10,000 id's

.withColumn('v', rand())) # 10,000,000 points

df.cache()

df.count()

@udf("double")

def plus_one(v):

return v + 1

df.withColumn('v', plus_one(df.v))

PySpark Cubing in SQL namespace

def cubed(s):

return s * s * s

spark.udf.register("cubed", cubed, LongType())

spark.range(1, 9).createOrReplaceTempView("udf_test")

spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

Add one to an array

arrayData = [[1, (1, 2, 3)], [2, (2, 3, 4)], [3, (3, 4, 5)]]

arraySchema = (T.StructType([

T.StructField("id", T.IntegerType(), True),

T.StructField("values", T.ArrayType(IntegerType()), True)

]))

df = spark.createDataFrame(spark.sparkContext.parallelize(arrayData), arraySchema)

df.createOrReplaceTempView("table")

def addOne(values):

return [value + 1 for value in values]

spark.udf.register("plusOneIntPy", addOne, ArrayType(IntegerType()))

spark.sql("SELECT id, plusOneIntPy(values) AS values FROM table").show()

Transform Celsius to Fahrenheit

schema = T.StructType([T.StructField("celsius", T.ArrayType(T.IntegerType()))])

t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]

t_c = spark.createDataFrame(t_list, schema)

t_c.createOrReplaceTempView("tC")

spark.sql("""SELECT celsius, transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit FROM tC""").show()

Pandas UDF

Pandas Cubing

from pyspark.sql.functions pandas_udf

def cubed(a: pd.Series) -> pd.Series:

return a * a * a

cubed_udf = pandas_udf(cubed, returnType=T.LongType())

df = spark.range(1, 4)

df.select("id", cubed_udf(col("id"))).show()

Vectorize plus one

@pandas_udf('double')

def vectorized_plus_one(v):

return v + 1

df.withColumn('v', vectorized_plus_one(df.v))

Subtract mean

from pyspark.sql.functions import PandasUDFType


@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)

def vectorized_subtract_mean(pdf):

return pdf.assign(v=pdf.v - pdf.v.mean())

df.groupby('id').apply(vectorized_subtract_mean)

Caching

See task graph

df.explain()

Cache

df.cache().count()

Persist (un persist)

df.persist(StorageLevel.DISK_ONLY).count()

df.unpersist()

See number of partitions

df.rdd.getNumPartitions()

Cache a table

%sql

CACHE TABLE DATABASE.TABLE

Machine Learning

Feature engineering

Drop null for a categorical field

df.na.drop(subset=["STR_COL"])

Impute missing values with median

imputer = Imputer(strategy="median", inputCols=imputeCols, outputCols=imputeCols)

df2= imputer.fit(df).transform(df)

Train-test split

trainDf, testDf = df.randomSplit([.8, .2], seed=42)


Evaluation

Regression metrics

https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionMetrics

from pyspark.ml.evaluation import RegressionMetrics

Visualization

Show the dataframe

display(df)