Spark
Apache Spark is a cluster aware distributed processing application capable of aggregation across many CPU cores and massive amounts of memory. It provides the means of processing large datasets that cannot fit into memory on a single node and/or using all available CPU cores. Spark supports Python, Scala, and Java applications.
Refer to https://spark.apache.org/
RDD
The power of Spark comes from distributed computing but also from RDD (Resilient Distributed Dataset). An RDD allows Spark to read a data set in chucks distributed across all compute nodes. An RDD acts like a Data Frame found in Python Pandas, and has similar functions for manipulation.
The following link explains RDD and many other aspects of Spark,
https://www.sciencedirect.com/topics/computer-science/resilient-distributed-dataset
SQL Context
The SQL context in PySpark provides functionality similar to SQL in a database allowing queries across an RDD. The example code below shows how to implement SQL in PySpark.
https://www.machinelearningplus.com/pyspark/run-sql-queries-with-pyspark/
Using Spark
Spark is usually associated with Hadoop and HDFS. In our case we are using the stand alone distribution and shared scratch space on Panasas. To incorporate Spark into your environment and shell scripts you should use the module.
module load Utils/spark/3.5.1
Doing this will provide access to tools and the ability to submit jobs and import necessary language libraries.
The most common way of submitting a job is with spark-submit, here is an example using only the local node resources.
spark-submit --master='local[*]' spark_title.py /scratch/mewan/title.basics.par
The master specification says to use the local node. The next argument is a program, in this case pyspark. The final argument is a file path passed to the pyspark script. Here is an example of a simple script that sets up a Spark Context in a Python script.
Run the script below as,
spark-submit --master='local[*]' spark_hello.py
#!/usr/bin/env python
import sys
import time
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession
import socket
spark = SparkSession.builder\
.appName("Running SQL Queries in PySpark")\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("INFO") # change this to WARN for less verbosity
print('======== START ========')
start = time.time()
print("Hello from ", socket.gethostname())
print("======== ELAPSED TIME: ", time.time() - start)
SQL example
Below is a code sample showing how to read a Parquet file and perform a query. This example uses the title basics dataset from IMDB (https://datasets.imdbws.com/). The Parquet file format provides better access and query performance over other file formats (https://www.databricks.com/glossary/what-is-parquet).
Save the example below as spark_sql.py then the submit command line on Coeus would be,
spark-submit --master='local[*]' spark_sql.py /scratch/mewan/title.basics.par
#!/usr/bin/env python
import sys
from os import path
import shutil
import time
# from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession
import socket
if len(sys.argv) < 2:
print("Need file path", file=sys.stderr)
sys.exit(1)
filen = sys.argv[1]
spark = SparkSession.builder\
.appName("Running SQL Queries in PySpark")\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
print('======== START ========')
print(socket.gethostname())
print("======== EXTRACT PARQUET ")
start = time.time()
df = spark.read\
.option("header", "true")\
.option("inferSchema", "true")\
.option("sep", "\t")\
.parquet(filen)
print("======== EXTRACT PARQUET ELAPSED TIME: ", time.time() - start)
print(df.columns)
print(df.count())
print("======== QUERY DATA ")
df.select("*").where("primaryTitle LIKE '%Far Cry%'").show(5)
print("======== ELAPSED TIME: ", time.time() - start)
Spark on Coeus cluster
To run Spark on the cluster you will need a master server and workers on cluster nodes, below are some example start scripts for each.
#!/bin/bash
module add Utils/spark/3.5.1
host=$(hostname -f)
port=7077
echo "Starting master as spark://$host:$port"
start-master.sh --host $host -p $port
This assumes you started the master on login1.coeus.rc.pdx.edu. You will want to call this script from an sbatch script (below).
#!/bin/bash
module add Utils/spark/3.5.1
hostname
export SPARK_NO_DAEMONIZE=1
start-worker.sh -c 20 -m 48G spark://login1.cluster:7077
#!/bin/bash
#SBATCH --job-name spark_worker
#SBATCH --nodes 4
#SBATCH --tasks-per-node 1
#SBATCH --cpus-per-task 20
# The %j variable includes the job number. Useful for multiple runs
#SBATCH --output simple_%j.txt # Send the standard output to simple_<job ID>.txt
#SBATCH --error simple_%j.err # Send the error output to simple_<job ID>.err
srun /scratch/mewan/spark_examples/start-spark-worker