Apache Spark
APACHE SPARK
Apache Spark [4] is an open source, parallel data processing framework that complements Apache Hadoop to make it easy to develop fast, unified Big Data applications combining batch, streaming, and interactive analytics on all your data.
SSH to hpcdata1 (login node for Hadoop cluster) for running spark using Cloudera Hadoop (you need to request the Hadoop account explicitly). To run as a stand-alone in the Rider cluster, load the module first. Check the version first.
module spider spark
module load spark
Interactive Job
Run Sparks in interactive Shell mode. You will get the scala prompt "scala>"
spark-shell
Create data
scala> val data = 1 to 10000
Create an RDD (Resilient Distributed Dataset), a fault-tolerant collection of elements that can be operated on in parallel, based on that data.
scala> val distData = sc.parallelize(data)
Use the filter to select values less than 10
scala> distData.filter(_<10).collect()
output:
...
15/05/19 15:23:24 INFO DAGScheduler: Job 0 finished: collect at <console>:26, took 1.324873 s
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
Exit
scala> exit
SPARK STAND-ALONE CLI (in Rider cluster)
Copy the directory "spark-standalone" from /usr/local/doc/SPARK/spark-standalone/ , change directory to "spark-standalone", and find scala and python files along with a job file
cp -r /usr/local/doc/SPARK/spark-standalone/ .
cd /usr/local/doc/SPARK/spark-standalone/
Request a compute node with 16gb of memory, 2 tasks and 2 cores/task
srun -n 2 -c 2 --mem=16gb --pty bash
Load spark module
module load spark/3.2.0
RUN
spark-shell -i --executor-cores 4 --executor-memory 5G pi-estimation.scala # scala
spark-submit --executor-cores 4 --executor-memory 5G pi-estimation.py # python
Pi is roughly 3.1464
SPARK STAND-ALONE Batch Job (in Rider)
Submit the spark python job and check the output at slurm-<jobID>.out
sbatch job.slurm
SPARK RAPID-INTEGRATION (in Rider)
The RAPIDS Accelerator for Apache Spark leverages GPUs to accelerate processing via the RAPIDS libraries.
Open the Markov Desktop with One GPUs and about 6-8gb of memory
Load cuda/11.2 and spark/3.2.0
Run spark-shell
spark-shell --master local --num-executors 1 --conf spark.rapids.sql.concurrentGpuTasks=1 --driver-memory 10g --conf spark.rapids.memory.pinnedPool.size=2G --conf spark.locality.wait=0s --conf spark.sql.files.maxPartitionBytes=512m --conf spark.plugins=com.nvidia.spark.SQLPlugin --jars ${SPARK_RAPIDS_PLUGIN_JAR}
Create an RDDs (Resilient Distributed Dataset)
val df = sc.makeRDD(1 to 10000000, 6).toDF # sql context (sc), num_slices = 6 i.e. 6 partitions
val df2 = sc.makeRDD(1 to 10000000, 6).toDF # to DataFrame (DF)
Join two DataFrames/Datasets on key columns, and where keys don’t match, the rows get dropped from both datasets
df.select( $"value" as "a").join(df2.select($"value" as "b"), $"a" === $"b").count
Open Firefox, type http://localhost:4040/ on the browser for UI, Check SQL Tab, and click on link on description, which shows GPU operations (see image below)
Batch Job (in hpcdata1 - Hadoop Cluster)
In this how-to [6], you’ll learn how to write, compile, and run a simple Spark program written in Scala on CDH 5. The full code for the example is hosted at https://github.com/sryza/simplesparkapp.
Clone or copy the repository
git clone https://github.com/sryza/simplesparkapp.git
or
cp -r /home/sxg125/hadoop-projects/spark/simplesparkapp .
Change to directory simplesparkap
cd /home/</hadoop-projects/spark/simplesparkap
Set the MAVEN path:
export PATH=$PATH:/home/share/apache-maven-3.2.2/bin/
Compile:
mvn package
output:
...
Building jar: /home/sxg125/hadoop-projects/spark/simplesparkapp/target/sparkwordcount-0.0.1-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 04:22 min
[INFO] Finished at: 2015-05-20T11:37:16-04:00
[INFO] Final Memory: 32M/874M
[INFO] ----------------------------------------------
JAR is created at target directory.
Running Job
Create an input directory in HDFS (see above) and put the inputfile.txt there.
hadoop fs -put data/inputfile.txt /user/<user>/projects/spark/input
submit job
spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --master yarn target/sparkwordcount-0.0.1-SNAPSHOT.jar /user/<user>/projects/spark/input/inputfile.txt 2 > output
View the output
cat output
output:
(p,2), (t,2), (b,1), (h,1), (n,4), (f,1), (v,1), (r,2), (l,1), (e,6), (a,4), (i,1), (u,1), (o,2), (c,1)