learn pyspark

Learn to program with pyspark. Mostly on loading / querying /transforming data.

A test csv data format is as follows (randomly generated, not real data).

first_name,last_name,gender,address,city,state,post,phone1,phone2,email

Rebbecca, Cool,F,171 E 24th St,Leith,TAS,7315,03-8888-1234,0458-000-111,rebbecca.cool@cool.com

Stevie, Hot,M,22222 Acoma St,Proston,QLD,4613,07-9999-5678,0497-111-000,stevie.hot@hot.com


from pyspark.sql import SparkSession

from pyspark.sql import functions as F

from pyspark.sql.types import IntegerType, StringType

from pyspark.sql.functions import broadcast

from pyspark.sql import Row



file_path = r'D:\programming\spark\test_data.csv'

spark = SparkSession.builder.getOrCreate()


#read in a csv file

df = spark.read.load(file_path,format="csv", sep=",", inferSchema="true", header="true")

df.limit(10).show()

df.printSchema()



#select some columns and sort

df2 = df.select('post', 'first_name')

df2 = df2.sort('first_name')

df2.show()


#sort descendingly

df2 = df2.sort(F.desc('first_name'))

df2.show()


#select rows

df3 = df2.filter(~df2.first_name.startswith('Z')) #not started with Z

df3.show()


#convert data type

df4 = df3.withColumn('post', F.col('post').cast(IntegerType()))

df4.show()


#group by 

df5 = df4.groupBy(["first_name"]).agg(F.min("post").alias('min id') ,F.max("post").alias('max id'))

df5.show()


#select state column

df6 = df.select('post', 'state').groupBy(['post']).agg(F.min('state').alias('state'))

df6.show()


#add a derived column

df6 = df6.withColumn("state full name", F.regexp_replace('state', 'QLD', 'Queensland'))

df6.filter(df6.state == 'QLD').show()


#join data frames

#join df2 to df6 on post code to get the state name for each row in df2

df7 = df2.join(df6, ['post'], how='inner')

df7.show()


#now there might be a performance problem

#df2 can be potentially huge if this is a list of say all customers

#while df6 is tiny as it is only a list of postcodes with state names

#in some case the above join may fail or take forever as too much data movement is involved

#a trick is to broadcast the 'tiny' table, which is the df6 in this case to every node in the cluster

#so df2 will be able to join to local df6 data.

#this could improve performance greatly in the scenario of joining a huge table to a tiny table in a multi-node cluster

df7 = df2.join(broadcast(df6), ['post'], how='inner')

df7.show()


#pyspark also supports a lot of sql syntax

#just need register the data frames as temp tables

#then operate on the temp tables using sql

df2.createOrReplaceTempView('main_table')

df6.createOrReplaceTempView('lookup_table')

df8 = spark.sql('select * from main_table a join lookup_table b on a.post=b.post where left(first_name,1) = "A"')

df8.show()


#user defined function UDF

#spark supports using a python function

def get_state_name(value):

    if value == 'QLD':

        return 'Queensland'

    elif value == 'NSW':

        return 'New South Wales'

    else:

        return value

#convert to a UDF Function by passing in the function and return type of function

state_name = F.udf(get_state_name, StringType())

df9 = df8.withColumn("state name", state_name("state"))

df9.show()


#sometimes using UDF or SQL is not performant enough

#and another option is using the RDD resilient distributed dataset

#here is a row wise function to apply on an RDD

def rowwise_function(row):

    # convert row to python dictionary:

    row_dict = row.asDict()

    # do something to the row

    row_dict['post'] = '0000'

    # convert dict to row back again:

    return Row(**row_dict)


# convert dataframe to RDD first

rdd = df2.rdd

# apply function to RDD

rdd = rdd.map(lambda row: rowwise_function(row))

# Convert RDD Back to DataFrame

df10 = rdd.toDF()

df10.show()


There many other functions that can be used from spark.

e.g.  pandas UDF through decorator

e.g. windows functions, like partition by, lag, rank, pivot, etc.