- ./bin/spark-shell #Launch Scala shell
- ./bin/pyspark #Launch Python shell
- val #f = sc.textFile("#file") #Read file in Scala
- #f = sc.textFile("#file") #Read file in Python
- val #value = sc.parallelize(#data) #Create RDD
- sc #Start Spark
- import org.apache.spark.SparkContext. #Import class in Scala
- import org.apache.spark.SparkContext._ #Import class in Scala
- import org.apache.spark.SparkConf._ #Import class in Scala
- val conf = new SparkConf().setAppName(#appName).setMaster(#master URL e.g. local[16]). set("spark.executor.memory", "1g") #Set 1GB for each executor
- val sc = newSparkContext(conf)
- from pyspark import SparkContext, SparkConf #Import class in Python
- conf = SparkConf().setAppName(#appName).setMaster(#master URL e.g. local[16])
- sc = SparkContext(conf=conf)
- #file.filter(lambda line: "#search" in line).count() #Count number of search item
- #file.map(lambda line: len(line.split())).reduce(lambda a,b: a if (a > b) else b) #Return number of lines
- #file.map(lambda line: len(line.split())).reduce(max)
- #function = #file.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a,b: a+b)
- val #function = lines.map(#n => #n.length).reduce((a,b) => a + b) #Find total length
- val #function = #file.filter(_.startsWith("#search")).map(_.split("\t")).map(n => n(1)) #Count of filter item
- #function = #file.filter(_.startsWith("#search item 1")).map(_.split("\t")).map(n => n(1)) .filter(_.contains("#search item 2")).count() #Count of filter item
- #function.collect() foreach println #Print output for each line of output
- #function.cache() #Use cache
- #function.toDebugString #View DAG transformation (Read from bottom to top)
- ./bin/spark-submit \
- --class org.apachhe.spark.examples.SparkPi \ #Class
- --master #local[#8] \ #Run locally on 8 cores
- /path/to/#examples.jar \ #File located in the path will be passed to SparkPi application
- #arg value #Enter var for computation
Spark SQL
- val sc: SparkContext #Scala
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- import sqlContext.createSchemaRDD
- case class #table(#col: #String, #col2: #Int) #Create known schema
- val #file = sc.textFile("#dir").map(_.split("")).map(#var => #Table(#p(0), #p(1).trim.toInt))
- ##file.registerTempTable("#file")
- val #search = sqlContext.sql("SELECT #var FROM #table WHERE #condition")
- #search.map(#t => "" + #t(0)).collect().foreach(println)
- val schemaStrong = "#col" #Create unknown schema
- val schema = StructType(schemaString.split(" ").map(#FieldName => StructField(#FieldName, StringType, true)))
- val #rowRDD = #table.map(_.split(",")).map(#p => Row(p(0), p(1).trim))
- val #SchemaRDD = sqlContext.applySchema(#rowRDD, schema)
- ##file.registerTempTable("#file")
- val #search = sqlContext.sql("SELECT #var FROM #table WHERE #condition")
- #search.map(#t => "" + #t(0)).collect().foreach(println)
- from pyspark.sql import SQLContext sqlContext = SQLContext(sc) #Python
- import org.apache.spark._
- import org.apache.spark.streaming._
- import org.apache.spark.streaming.StreamingContext._
- val #conf = new SparkConf().setMaster("#local[#2]").setAppName("#appName")
- val #ssc = new StreamContext(#conf, Seconds(#1))
- val #lines = ssc.socketTextStream("#localhost", #9999)
- val #words = lines.flatMap(_.split(" "))
- val #pairs = #words.map(#word => (#word, 1))
- val #wordCounts = pairs.reduceByKey(#_ + #_)
- #wordCounts.print()
- #ssc.start() #Start computation
- #ssc.awaitTermination #Await for termination
- Run netcat to start datastream
- In another terminal, run application
- ./bin/run-example streaming.#NetworkWordCount #localhost #9999
- val pair = ('a', 'b') #Scala
- pair = ('a', 'b') #Python
- Tuple2 pair = new Tuple2('a', 'b'); #Java