Spark

Apache Spark is a cluster aware distributed processing application capable of aggregation across many CPU cores and massive amounts of memory.  It provides the means of processing large datasets that cannot fit into memory on a single node and/or using all available CPU cores.  Spark supports Python, Scala, and Java applications.

Refer to https://spark.apache.org/

RDD

The power of Spark comes from distributed computing but also from RDD (Resilient Distributed Dataset).  An RDD allows Spark to read a data set in chucks distributed across all compute nodes.  An RDD acts like a Data Frame found in Python Pandas, and has similar functions for manipulation.

The following link explains RDD and many other aspects of Spark,
https://www.sciencedirect.com/topics/computer-science/resilient-distributed-dataset

SQL Context

The SQL context in PySpark provides functionality similar to SQL in a database allowing queries across an RDD.  The example code below shows how to implement SQL in PySpark.
https://www.machinelearningplus.com/pyspark/run-sql-queries-with-pyspark/

Using Spark

Spark is usually associated with Hadoop and HDFS.  In our case we are using the stand alone distribution and shared scratch space on Panasas.  To incorporate Spark into your environment and shell scripts you should use the module.

module load Utils/spark/3.5.1

Doing this will provide access to tools and the ability to submit jobs and import necessary language libraries.

The most common way of submitting a job is with spark-submit, here is an example using only the local node resources.

spark-submit --master='local[*]' spark_title.py /scratch/mewan/title.basics.par

The master specification says to use the local node.  The next argument is a program, in this case pyspark.  The final argument is a file path passed to the pyspark script.  Here is an example of a simple script that sets up a Spark Context in a Python script.

Run the script below as,
spark-submit --master='local[*]' spark_hello.py

#!/usr/bin/env python


import sys

import time

from pyspark.sql import SQLContext

from pyspark import SparkContext

from pyspark.sql.types import StringType

from pyspark.sql import SparkSession

import socket


spark = SparkSession.builder\

            .appName("Running SQL Queries in PySpark")\

            .getOrCreate()


sc = spark.sparkContext

sc.setLogLevel("INFO")  # change this to WARN for less verbosity

print('======== START ========')

start = time.time()

print("Hello from ", socket.gethostname())

print("======== ELAPSED TIME: ", time.time() - start)

SQL example

Below is a code sample showing how to read a Parquet file and perform a query.  This example uses the title basics dataset from IMDB (https://datasets.imdbws.com/).  The Parquet file format provides better access and query performance over other file formats (https://www.databricks.com/glossary/what-is-parquet).

Save the example below as spark_sql.py then the submit command line on Coeus would be,
spark-submit --master='local[*]' spark_sql.py /scratch/mewan/title.basics.par

#!/usr/bin/env python


import sys

from os import path

import shutil

import time

# from pyspark.sql import SQLContext

from pyspark import SparkContext

from pyspark.sql.types import StringType

from pyspark.sql import SparkSession

import socket


if len(sys.argv) < 2:

    print("Need file path", file=sys.stderr)

    sys.exit(1)

filen = sys.argv[1]


spark = SparkSession.builder\

            .appName("Running SQL Queries in PySpark")\

            .getOrCreate()


sc = spark.sparkContext

sc.setLogLevel("WARN")

print('======== START ========')

print(socket.gethostname())

print("======== EXTRACT PARQUET ")

start = time.time()

df = spark.read\

         .option("header", "true")\

         .option("inferSchema", "true")\

         .option("sep", "\t")\

         .parquet(filen)

print("======== EXTRACT PARQUET ELAPSED TIME: ", time.time() - start)

print(df.columns)

print(df.count())

print("======== QUERY DATA ")

df.select("*").where("primaryTitle LIKE '%Far Cry%'").show(5)

print("======== ELAPSED TIME: ", time.time() - start)


Spark on Coeus cluster

To run Spark on the cluster you will need a master server and workers on cluster nodes, below are some example start scripts for each.

#!/bin/bash

module add Utils/spark/3.5.1

host=$(hostname -f)

port=7077

echo "Starting master as spark://$host:$port"

start-master.sh --host $host -p $port

This assumes you started the master on login1.coeus.rc.pdx.edu.  You will want to call this script from an sbatch script (below).

#!/bin/bash

module add Utils/spark/3.5.1

hostname

export SPARK_NO_DAEMONIZE=1

start-worker.sh -c 20 -m 48G spark://login1.cluster:7077

#!/bin/bash


#SBATCH --job-name spark_worker

#SBATCH --nodes 4

#SBATCH --tasks-per-node 1

#SBATCH --cpus-per-task 20

# The %j variable includes the job number.  Useful for multiple runs

#SBATCH --output simple_%j.txt        # Send the standard output to simple_<job ID>.txt

#SBATCH --error simple_%j.err         # Send the error output to simple_<job ID>.err

srun /scratch/mewan/spark_examples/start-spark-worker