Disadvantages of Map Reduce Framework:
Introduction:
How to start spark interactive shell?
Step 1: Open command prompt and go to spark bin directory
Step 2: Type "spark-shell" command and press Enter
Result: It will create spark context and sql context at the end and scala> prompt will be displayed
How to exit from spark interactive shell?
Press "Ctrl+D" for exit
Basic Transformations:
Program #1: map transformation
Scala Code:
val numbers = sc.parallelize(List(1,2,3,4,5,6,23,45,87))
Output:
Array[Int] = Array(1, 4, 9, 16, 25, 36, 529, 2025, 7569)
Explanation:
In the first line, we have created an RDD with the name "numbers", that consists of list of numbers
In the second line, each element in the numbers RDD will be squared and stored in another RDD "squares"
in the third line, we have collected all the elements of squares RDD and displayed
Program #2: Demo1 on filter transformation
Scala Code:
val squares = numbers.map(x => x*x)
squares.collect()
val evenNos = numbers.filter (x => (x % 2 == 0))
evenNos.collect()
Output:
Array[Int] = Array(2, 4, 6)
Explanation:
This code filters even numbers from the numbers RDD.
Program #3:Demo 2 on filter transformation
Scala Code:
numbers.filter (_ < 10).collect ()
Output:
Array[Int] = Array(1, 2, 3, 4, 5, 6)
Explanation:
This code filters the numbers that are less than 10 from numbers RDD
Program #4: Demo on nested map
Scala Code:
Output:
val nestedmap = numbers.map(x => 0 to(x))
nestedmap.collect()
Array[scala.collection.immutable.Range.Inclusive] = Array(
Range(0, 1),
Range(0, 1, 2),
Range(0, 1, 2, 3),
Range(0, 1, 2, 3, 4),
Range(0, 1, 2, 3, 4, 5),
Range(0, 1, 2, 3, 4, 5, 6),
Range(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13,14, 15, 16, 17, 18, 19, 20, 21, 22, 23),
Range(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30,31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45),
Range(0, 1, 2, 3,4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84,85,86,87)
Explanation:
This code creates nested maps for each element in the numbers RDD. The range of each nested map is from 0 to element value.
For example:
if the element is 5 then the nested map range is (0,1,2,3,4,5)
Program #5:flatMap transformation
Scala Code:
Output:
val flatmap = numbers.flatMap(x => 0 to(x))
flatmap.collect()
Array[Int] = Array(0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3, 4, 0, 1, 2, 3,4, 5, 0, 1, 2, 3, 4, 5, 6, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18, 19, 20, 21, 22, 23, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30,31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70,71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87)
Explanation:
This code creates single flatmap with merging all of its nested maps. You can refer output of Program#4 for individual nestedmaps.
Program #6: To display first 4 elements from numbers RDD
Scala Code:
val numbers = sc.parallelize(List(1,2,7,8,5,6,23,45,87))
numbers.take(4)
Output:
Array[Int] = Array(1, 2, 7, 8)
Program#7: To display number of elements in the numbers RDD
Scala Code:
val numbers = sc.parallelize(List(1,2,7,8,5,6,23,45,87))
numbers.count()
Output:
Long = 9
Program#8: sum of numbers in the list
Scala Code:
val numbers = sc.parallelize(List(1,2,7,8,5,6,23,45,87))
numbers.reduce({case (x,y) => x + y})
Output:
Int = 184
Program#9: if x=0 return 1 else return x
Scala Code:
val rdd = sc.parallelize(Range(0,100))
rdd.cache ()
val rdd2 = rdd.map(x => {
println("Value : " + x)
var ret = 1
if(x != 0) ret = x
ret
})
rdd2.collect().foreach(println)
Output:
1 1 2 3 4......upto 99
Program#10:Demo 1 on Key Value pair
Scala Code:
Output:
Array[(String, Int)] = Array((ECE,1), (CSE,2), (EEE,3), (ECE,5), (EEE,8))
Program#11: Demo on reduceByKey
Scala Code:
Output:
Array[(String, Int)] = Array((EEE,11), (CSE,2), (ECE,6))
Program#12:Demo on sortByKey
Scala Code:
Output:
Array[(String, Int)] = Array((CSE,2), (ECE,1), (ECE,5), (EEE,3), (EEE,8))
Program#13: word count problem
Scala Code:
val departments = List (("ECE",1),("CSE", 2),("EEE", 3),("ECE",5),("EEE",8))
val departmentsRdd = sc.parallelize(departments)
departmentsRdd.collect()
val reducedRdd = departmentsRdd.reduceByKey(_ + _)
reducedRdd.collect()
val sortByKeyRdd = departmentsRdd.sortByKey()
sortByKeyRdd.collect()
val data = sc.textFile("README.md") //Read text file
val rdd1= data.flatMap(l => l.split(" ")) //split words based on single space
val rdd2=rdd1.map(word => (word,1)) //assign each word with 1
val wc=rdd2.reduceByKey(_ + _) //sum values of same key
wc.collect()
Output:
res0: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/buildingspark.html#specifying-the-hadoop-version),1),(Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (YARN,,1), (have,1), (pre-built,1), (locally.,1), (changed,1), (locally,2), (sc.paralleli
ze(1,1), (only,1), (several,1), (This,2), (basic,1), (first,1), (documentation,3), (Configuration,1), (learning,,1), (graph,1), (Hive,2), (["Specifying,1), ("yarn",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (are,1), (systems.,1), (params,1), (scala>,1), (DataFrames,,1), (provides,1), (refer,2)...
Program#14:web server log analysis
Scala Code:
//Load data from NASA web server log file
val lines = sc.textFile("E:\\spark-1.3.1-bin-hadoop2.4\\bin\\nasa_19950801.tsv")
//Filter based on IP Address
val split_lines = lines.filter(line => "202.236.34.35".equals(line.split("\t")(0)))
//Count no of bytes. since data is loaded in string format and it needs to be converted to integer
val bytescount = split_lines.map(line => line.split("\t")(6).toInt).reduce(_ + _)
Output:
bytescount: Int = 20791
Program#15: Demo on joins
Scala Code:
val readmedata = sc.textFile("README.md")
val wc1 = readmedata.flatMap(l => l.split(" ")).filter(_ == "Spark").map(word => (word,1)).reduceByKey(_ + _)
val data = sc.textFile("CHANGES.txt")
val wc2 = data.flatMap(l => l.split(" ")).filter(_ == "Spark").map(word => (word,1)).reduceByKey(_ + _)
wc1.join(wc2).collect ()
Output:
Array[(String, (Int, Int))] = Array((Spark,(13,202)))