The following software are required : JDK7+, Maven 3.X. You can either use the
Scala/Python shell (easier) or the Java API. We will follow a modified version of this tutorial
Running Spark-shell
The spark-shell is used for interacting directly with a Spark cluster. If the shell is started without parameters, it will simply create its own Spark instance. It's usually used for quick testing.
The better solution is to first deploy a Spark cluster locally.
Deploying Spark
To connect to your cluster with the Spark shell, you have to specify the address of your master with the option --master=.... The address can be found in the web interface.
In order to use GraphX, you need to perform these imports
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
Array(...)
of vertices of the form (id, (Name, value)) where id is a Long value (1L, 2L, ...)Array(...)
of Edge(srcId, destID, weight)
sc.parallelize()
Now that we have two RDDs, we can build a graph with the call to
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
A graph has two attributes, vertices
and edges
foreach
and a lambda calling println
filter(...).
In Spark, this is a transformation and it is lazily evaluated. You need to perform an action to do so, such as collect()Perform the triplets
operation on your graph and discuss the result. Use it to select only pair of vertices where the edge value is greater than a threshold.
We are now going to load a graph from a text file. Some real world dataset are available here
Loading some text files is simply done by calling sc.textFile(...).
A graph can be saved as a text file (saveAsTextFile
) or as a binary file (saveAsObjectFile
)
GraphX has already a set of standard graph operations such as PageRank or ConnectedComponent. Test them on your graphs.
The aggregateMessage is an operation which applies a map() function on the triplets, generating messages for vertices. This messages are aggregated using a reduce() function, creating a new VertexRDD.
def aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[Msg]
aggregateMessages
!By producing carefully crafted messages in the Iterator, it is possible to compute the average of all incoming values in the reduce function. Any idea how to do that ?
GraphX provides a pregel-like API inspired from the Google version.
The rest of this exercise aims at understanding the pregel API and write a simple implementation of connected components
def pregel[A] (initialMsg: A, maxIter: Int = Int.MaxValue, activeDir: EdgeDirection = EdgeDirection.Out) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A)