learn pyspark
Learn to program with pyspark. Mostly on loading / querying /transforming data.
A test csv data format is as follows (randomly generated, not real data).
first_name,last_name,gender,address,city,state,post,phone1,phone2,email
Rebbecca, Cool,F,171 E 24th St,Leith,TAS,7315,03-8888-1234,0458-000-111,rebbecca.cool@cool.com
Stevie, Hot,M,22222 Acoma St,Proston,QLD,4613,07-9999-5678,0497-111-000,stevie.hot@hot.com
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import broadcast
from pyspark.sql import Row
file_path = r'D:\programming\spark\test_data.csv'
spark = SparkSession.builder.getOrCreate()
#read in a csv file
df = spark.read.load(file_path,format="csv", sep=",", inferSchema="true", header="true")
df.limit(10).show()
df.printSchema()
#select some columns and sort
df2 = df.select('post', 'first_name')
df2 = df2.sort('first_name')
df2.show()
#sort descendingly
df2 = df2.sort(F.desc('first_name'))
df2.show()
#select rows
df3 = df2.filter(~df2.first_name.startswith('Z')) #not started with Z
df3.show()
#convert data type
df4 = df3.withColumn('post', F.col('post').cast(IntegerType()))
df4.show()
#group by
df5 = df4.groupBy(["first_name"]).agg(F.min("post").alias('min id') ,F.max("post").alias('max id'))
df5.show()
#select state column
df6 = df.select('post', 'state').groupBy(['post']).agg(F.min('state').alias('state'))
df6.show()
#add a derived column
df6 = df6.withColumn("state full name", F.regexp_replace('state', 'QLD', 'Queensland'))
df6.filter(df6.state == 'QLD').show()
#join data frames
#join df2 to df6 on post code to get the state name for each row in df2
df7 = df2.join(df6, ['post'], how='inner')
df7.show()
#now there might be a performance problem
#df2 can be potentially huge if this is a list of say all customers
#while df6 is tiny as it is only a list of postcodes with state names
#in some case the above join may fail or take forever as too much data movement is involved
#a trick is to broadcast the 'tiny' table, which is the df6 in this case to every node in the cluster
#so df2 will be able to join to local df6 data.
#this could improve performance greatly in the scenario of joining a huge table to a tiny table in a multi-node cluster
df7 = df2.join(broadcast(df6), ['post'], how='inner')
df7.show()
#pyspark also supports a lot of sql syntax
#just need register the data frames as temp tables
#then operate on the temp tables using sql
df2.createOrReplaceTempView('main_table')
df6.createOrReplaceTempView('lookup_table')
df8 = spark.sql('select * from main_table a join lookup_table b on a.post=b.post where left(first_name,1) = "A"')
df8.show()
#user defined function UDF
#spark supports using a python function
def get_state_name(value):
if value == 'QLD':
return 'Queensland'
elif value == 'NSW':
return 'New South Wales'
else:
return value
#convert to a UDF Function by passing in the function and return type of function
state_name = F.udf(get_state_name, StringType())
df9 = df8.withColumn("state name", state_name("state"))
df9.show()
#sometimes using UDF or SQL is not performant enough
#and another option is using the RDD resilient distributed dataset
#here is a row wise function to apply on an RDD
def rowwise_function(row):
# convert row to python dictionary:
row_dict = row.asDict()
# do something to the row
row_dict['post'] = '0000'
# convert dict to row back again:
return Row(**row_dict)
# convert dataframe to RDD first
rdd = df2.rdd
# apply function to RDD
rdd = rdd.map(lambda row: rowwise_function(row))
# Convert RDD Back to DataFrame
df10 = rdd.toDF()
df10.show()
There many other functions that can be used from spark.
e.g. pandas UDF through decorator
e.g. windows functions, like partition by, lag, rank, pivot, etc.