Big Data Tutorial: Spark
Today, Spark is being adopted by major players like Amazon, eBay, and Yahoo! Many organizations run Spark on clusters with thousands of nodes. According to the Spark FAQ, the largest known cluster has over 8000 nodes. Indeed, Spark is a technology well worth taking note of and learning about.
This tutorial provides an introduction and practical knowledge to Spark. It contains information from the Apache Spark website as well as the book Learning Spark - Lightning-Fast Big Data Analysis.
Spark on Hadoop: Spark is a framework for performing general data analytics on a distributed computing cluster like Hadoop. It provides in-memory computations for increase speed and data process over mapreduce. It runs on top of existing hadoop cluster and access hadoop data store (HDFS), can also process structured data in Hive and Streaming data from HDFS, Flume, Kafka, Twitter. Refer Big Data Tutorial 1 to know more about hadoop.
What is Apache Spark? An Introduction
Spark is an Apache project advertised as “lightning-fast cluster computing”. It has a thriving open-source community and is the most active Apache project at the moment.
Spark provides a faster and more general data processing platform. Spark lets you run programs up to 100x faster in memory, or 10x faster on disk than Hadoop. Last year, Spark took over Hadoop by completing the 100 TB Daytona GraySort contest 3x faster on one-tenth the number of machines and it also became the fastest open-source engine for sorting a petabyte.
Spark also makes it possible to write code more quickly as you have over 80 high-level operators at your disposal. To demonstrate this, let’s have a look at the “Hello World!” of BigData: the Word Count example. Written in Java for MapReduce it has around 50 lines of code, whereas in Spark (and Scala) you can do it as simply as this:
sparkContext.textFile("hdfs://...").flatMap(line => line.split(" "))
.map(word => (word, 1)).reduceByKey(_ + _)
.saveAsTextFile("hdfs://...")
Q) Is there a point of learning Mapreduce, then?
A) Yes. For the following reasons:
Mapreduce is a paradigm used by many big data tools including Spark. So, understanding the MapReduce paradigm and how to convert a problem into a series of MR tasks is very important.
When the data grows beyond what can fit into the memory on your cluster, the Hadoop Map-Reduce paradigm is still very relevant.
Almost, every other tool such as Hive or Pig converts its query into MapReduce phases. If you understand the Mapreduce then you will be able to optimize your queries better.
Q) When do you use apache spark? or What are the benefits of Spark over Mapreduce?
A) The benefits of sparks are:
Spark is really fast. As per their claims, it runs programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk. It aptly utilizes RAM to produce faster results.
In map-reduce paradigm, you write many Map-reduce tasks and then tie these tasks together using Oozie/shell script. This mechanism is very time consuming and the map-reduce task has heavy latency.
Quite often, translating the output out of one MR job into the input of another MR job might require writing another code because Oozie may not suffice.
In Spark, you can basically do everything using a single application/console (pyspark or scala console) and get the results immediately. Switching between 'Running something on cluster' and 'doing something locally' is fairly easy and straightforward. This also leads to less context switch of the developer and more productivity.
Spark kind of equals to MapReduce and Oozie put together.
Additional key features of Spark include:
It currently provides APIs in Scala, Java, and Python, and R.with support for other languages on the way.
Integrates well with the Hadoop ecosystem and data sources (HDFS, Amazon S3, Hive, HBase, Cassandra, etc.)
it can run on clusters managed by Hadoop YARN or Apache Mesos, and also run standalone. On peel, it's managed by YARN.
The Spark core is complemented by a set of powerful, higher-level libraries that can be seamlessly used in the same application. These libraries currently include SparkSQL, Spark Streaming, MLlib (for machine learning), and GraphX.
Spark Core
Spark Core is the base engine for large-scale parallel and distributed data processing. It is responsible for:
memory management and fault recovery
scheduling, distributing and monitoring jobs on a cluster
interacting with storage systems
Spark introduces the concept of an RDD (Resilient Distributed Dataset).
Q) What is RDD - Resilient Distributed Dataset?
A) RDD is a representation of data located on a network which is
Immutable - You can operate on the RDD to produce another RDD but you can’t alter it.
Partitioned / Parallel - The data located on RDD is operated in parallel. Any operation on RDD is distributed among using multiple nodes.
Resilience - If one of the nodes hosting the partition fails, another node takes its data.
RDD is not store the data itself, but the location and calculation process of data.
Main Primitives - RDDs support two types of operations: Transformations and Actions
Q) What are Transformations?
A) The transformations are the functions that are applied on an RDD (resilient distributed data set). The transformation results in another RDD. A transformation is not executed until an action follows. (Lazy Execucation)
The example of transformations are:
map(func) - applies the function passed to it on each element of RDD resulting in a new RDD.
filter(func) - creates a new RDD by picking the elements from the current RDD which pass the function argument.
coalesce(numPartitions) - reduce RDD partition to numParitions
Q) What are Actions?
A) An action brings back the data from the RDD to the local machine. Execution of an action results in all the previously created transformation. The example of actions are:
reduce(func) - executes the function passed again and again until only one value is left. The function should take two arguments and return one value.
take(n) - take n values back to the local node form RDD.
collect() - return all element in the dataset in format of array
Transformations in Spark are “lazy”, meaning that they do not compute their results right away. Instead, they just “remember” the operation to be performed and the dataset (e.g., file) to which the operation is to be performed. The transformations are only actually computed when an action is called and the result is returned to the driver program. This design enables Spark to run more efficiently, as it can optimize the DAG of transformations we have created. For example, if a big file was transformed in various ways and passed to the first action, Spark would only process and return the result for the first line, rather than do the work for the entire file.
By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist or cache method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.
Hands-on Exercise Using PySpark
Set up a Spark Standalone Cluster and open a new interactive Jupyter notebook. A notebook containing all of the code below can be found at /scratch/work/public/spark/spark_examples.ipynb.
Initializing SparkContext:
SparkContext represents the connection to a Spark execution environment. In fact, the main entry point to Spark functionality is SparkContext. You have to create a Spark context before using Spark features and services in your application. A Spark context can be used to create RDDs to access Spark services and run jobs.
# Initialize the SparkContextfrom pyspark.sql import SparkSessionspark = SparkSession.builder.appName("SparkStandaloneTest").getOrCreate()sc = spark.sparkContextYou may see a "WARNING: An illegal reflective access operation has occurred", but the code should still run.
Creating RDD
# Turn a collection into an RDDsc.parallelize([1, 2, 3])# Load text file from storage on Greenesc.textFile("/scratch/<net_id>/path/to/file.txt")
Basic Transformations:
nums = sc.parallelize([1, 2, 3])# Pass each element through a function
squares = nums.map(lambda x: x*x)squares.collect() # => [1, 4, 9]
# Keep elements passing a predicate
evens = squares.filter(lambda x: x%2 == 0)evens.collect() # => [4]
# Map each element to zero or more others
ranges = nums.flatMap(lambda x: range(x))ranges.collect() # => [0, 0, 1, 0, 1, 2]
Basic Actions:
nums = sc.parallelize([1, 2, 3])# Retrieve RDD contents as a local collectionnums.collect() # => [1, 2, 3]
# Return first K elementsnums.take(2) # => [1, 2]
# Count number of elementsnums.count() # => 3
# Merge elements with an associative functionnums.reduce(lambda x, y: x + y) # => 6
Working with Key-Value Pairs:
pets = sc.parallelize([("cat", 1), ("dog", 1), ("cat", 2)])# reduceByKey also automatically implements combiners on the map side
reduced_pets = pets.reduceByKey(lambda x,y: x + y)reduced_pets.collect() # => [('dog', 1), ('cat', 3)]
sorted_pets = pets.sortByKey()sorted_pets.collect() # => [('cat', 1), ('cat', 2), ('dog', 1)]
# groupByKey generates iterables of all the key's associated values
grouped_pets = pets.groupByKey()grouped_pets.collect() # => [('dog', <ResultIterable>), ('cat', <ResultIterable>)]
Other Operations: Joins and Grouping
page_visits = sc.parallelize([ ("index.html", "1.2.3.4"), ("about.html", "3.4.5.6"), ("index.html", "1.3.3.7")])page_names = sc.parallelize([ ("index.html", "Home"), ("about.html", "About")])page_visits.join(page_names).collect()# => [('about.html', ('3.4.5.6', 'About')),# ('index.html', ('1.2.3.4', 'Home')),# ('index.html', ('1.3.3.7', 'Home'))]
cogroup = page_visits.cogroup(page_names).collect()[(x, tuple(map(list, y))) for x, y in sorted(list(page_visits.cogroup(page_names).collect()))]# => [('about.html', (['3.4.5.6'], ['About'])),# ('index.html', (['1.2.3.4', '1.3.3.7'], ['Home']))]
Input/Output
# Write RDD to a text file# Elements will be saved in string format# The output will automatically be sharded into multiple files within the 'output' directorynums.saveAsTextFile("/scratch/<net id>/path/to/output")# Read RDD from a saved text filereadnums = sc.textFile("/scratch/<net id>/path/to/output/part-*")readnums.collect() # => ['1', '2', '3']