Spark

There are currently two types of RDDs:
a) parallelized collections, which take an existing Scala collection and run functions on it in parallel
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
One important parameter for parallel collections is the number of slices to cut the dataset into. Spark will run one task for each slice of the cluster. Typically you want 2-4 slices for each CPU in your cluster.
b) Hadoop datasets, which run functions on each record of a file in Hadoop distributed file system or any other storage system supported by Hadoop.
text files, SequenceFiles, and any other Hadoop InputFormat ( Amazon S3, Hypertable, HBase, etc).

Both types of RDDs can be operated on through the same methods.

Spark supports two types of shared variables: 
a) broadcast variables, which can be used to cache a value in memory on all nodes, 
b) accumulators, which are variables that are only “added” to, such as counters and sums.

updates to the variables on the remote machine are propagated back to the driver program

RDDs support two types of operations:
a) transformations, which create a new dataset from an existing one, and
map is a transformation that passes each dataset element through a function and returns a new distributed dataset representing the results
b) actions, which return a value to the driver program after running a computation on the dataset
reduce is an action that aggregates all the elements of the dataset using some function and returns the final result to the driver program (although there is also a parallel reduceByKeythat returns a distributed dataset).

The transformations are only computed when an action requires a result to be returned to the driver program.

By default, each transformed RDD is recomputed each time you run an action on it. However, you may also persistan RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting datasets on disk, or replicated across the cluster.

each RDD can be stored(cached) using a different storage level, allowing you, for example, to persist the dataset on disk, or persist it in memory but as serialized Java objects (to save space), or even replicate it across nodes.



Size of data
Complexity, how clean it is, how shaped?
Volume (size)
Velocity (of coming to you)
Variety (Coming from all over the place)
Trade-off
Time
Quality
Money 
To estimate memory in java: cache a million objects look at a million of them and see how big they are using an estimate using reflection. let user provide hints about that

$  spark
scala> sc    //spark context
scala> System.getenv("PATH")    //get environment variables
scala> val file = sc.textFile("file/url/goes/here")
scala> file.first   //prints the first record(first line)




$ cd spark-0.7.0
$ sbt/sbt package
$ ./spark-shell
scala> val textFile = sc.textFile("README.md")
scala> textFile.count() // Number of items in this RDD
scala> ile.first() // First item in this RDD
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) //line with most words or use java's math library
scala> import java.lang.Math
scala>  textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) //map-reduce style
scala> wordCounts.collect() //collect action
scala> linesWithSpark.cache() //cache content to memory
It may seem silly to use Spark to explore and cache a 30-line text file. The interesting part is that these same functions can be used on very large data sets, even when they are striped across tens or hundreds of nodes. 
scala>  linesWithSpark.count()
scala>  linesWithSpark.count()

A Standalone Job in Scala

Now say we wanted to write a standalone job using the Spark API. We will walk through a simple job in both Scala (with sbt) and Java (with maven). If you are using other build systems, consider using the Spark assembly JAR described in the developer guide.

We’ll create a very simple Spark job in Scala. So simple, in fact, that it’s named SimpleJob.scala:

/*** SimpleJob.scala ***/
import spark.SparkContext
import SparkContext._

object SimpleJob extends Application {
  val logFile = "/var/log/syslog" // Should be some file on your system
  val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME",
    List("target/scala-2.9.2/simple-project_2.9.2-1.0.jar"))
  val logData = sc.textFile(logFile, 2).cache()
  val numAs = logData.filter(line => line.contains("a")).count()
  val numBs = logData.filter(line => line.contains("b")).count()
  println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}

This job simply counts the number of lines containing ‘a’ and the number containing ‘b’ in a system log file. Unlike the earlier examples with the Spark shell, which initializes its own SparkContext, we initialize a SparkContext as part of the job. We pass the SparkContext constructor four arguments, the type of scheduler we want to use (in this case, a local scheduler), a name for the job, the directory where Spark is installed, and a name for the jar file containing the job’s sources. The final two arguments are needed in a distributed setting, where Spark is running across several nodes, so we include them for completeness. Spark will automatically ship the jar files you list to slave nodes.

This file depends on the Spark API, so we’ll also include an sbt configuration file, simple.sbt which explains that Spark is a dependency. This file also adds two repositories which host Spark dependencies:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.9.2"

libraryDependencies += "org.spark-project" %% "spark-core" % "0.7.0"

resolvers ++= Seq(
  "Akka Repository" at "http://repo.akka.io/releases/",
  "Spray Repository" at "http://repo.spray.cc/")

Of course, for sbt to work correctly, we’ll need to layout SimpleJob.scala and simple.sbt according to the typical directory structure. Once that is in place, we can create a JAR package containing the job’s code, then use sbt run to execute our example job.

$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleJob.scala

$ sbt package            # make jar file ./target/scala-2.9.2/simple-project_2.9.2-1.0.jar
$ sbt run
...
Lines with a: 8422, Lines with b: 1836

This example only runs the job locally; for a tutorial on running jobs across several machines, see the Standalone Mode documentation, and consider using a distributed input source, such as HDFS.

To make it an eclipse project recognizing sbt: 

in the main directory of the sbt project (where the  .sbt file lies)
$ cd project
$ vim plugins.sbt
put the following line there:
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.2")
Save and close the file.
$ cd ..                back to the home of the project.
$ sbt eclipse
The project si ready. import it in eclipse and run it!









Subpages (1): Spark Streaming
Comments