Graph frameworks

GraphX

The following software are required : JDK7+, Maven 3.X. You can either use the

Scala/Python shell (easier) or the Java API. We will follow a modified version of this tutorial

Running Spark-shell

The spark-shell is used for interacting directly with a Spark cluster. If the shell is started without parameters, it will simply create its own Spark instance. It's usually used for quick testing.

The better solution is to first deploy a Spark cluster locally.

Deploying Spark

  • Download a pre-built version of Spark 2+ for Hadoop or build it from source using make_distribution.sh
  • Add localhost to the conf/slaves file
  • Start your cluster with sbin/start-all.sh and check that the web interface is available at http://localhost:8080

To connect to your cluster with the Spark shell, you have to specify the address of your master with the option --master=.... The address can be found in the web interface.

Manually building a graph

In order to use GraphX, you need to perform these imports

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
  • Define an Array(...) of vertices of the form (id, (Name, value)) where id is a Long value (1L, 2L, ...)
  • Define an Array(...) of Edge(srcId, destID, weight)
  • Turn this arrays into RDD using sc.parallelize()

Now that we have two RDDs, we can build a graph with the call to

val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

Basic operations

A graph has two attributes, vertices and edges

  • Display the number of edges and vertices in your graph
  • Do a pretty printing of the vertices using foreach and a lambda calling println
  • Filter create a new graph, keeping only the vertices with a value greater than a threshold, using filter(...). In Spark, this is a transformation and it is lazily evaluated. You need to perform an action to do so, such as collect()

Triplets

Perform the triplets operation on your graph and discuss the result. Use it to select only pair of vertices where the edge value is greater than a threshold.

Input Output

We are now going to load a graph from a text file. Some real world dataset are available here

Loading some text files is simply done by calling sc.textFile(...).

  • Load a real file and display the number of vertices and edges

A graph can be saved as a text file (saveAsTextFile) or as a binary file (saveAsObjectFile)

Standard operations

GraphX has already a set of standard graph operations such as PageRank or ConnectedComponent. Test them on your graphs.

Aggregate Message (formerly mapReduceTriplets)

The aggregateMessage is an operation which applies a map() function on the triplets, generating messages for vertices. This messages are aggregated using a reduce() function, creating a new VertexRDD.

def aggregateMessages[Msg: ClassTag](       sendMsg: EdgeContext[VD, ED, Msg] => Unit,       mergeMsg: (Msg, Msg) => Msg,       tripletFields: TripletFields = TripletFields.All)     : VertexRDD[Msg]
  • Write a map function which sends to a destination vertex the value of the source vertex
  • Write a reduce function which only keeps the largest value received
  • Execute aggregateMessages !

By producing carefully crafted messages in the Iterator, it is possible to compute the average of all incoming values in the reduce function. Any idea how to do that ?

Pregel API

GraphX provides a pregel-like API inspired from the Google version.

  • The GraphX implementation differs from the original idea, what are the differences ?
  • There are 2 implementations of PageRank in GraphX, with one of them using pregel. Compare the 2 implementations (parameters, calls, end condition...)
  • Execute the 2 versions on a large graph and compare their performance. How can you ensure the number of iterations is "enough" ?

The rest of this exercise aims at understanding the pregel API and write a simple implementation of connected components

  • Explain the various parameters of the pregel operator
  def pregel[A]       (initialMsg: A,        maxIter: Int = Int.MaxValue,        activeDir: EdgeDirection = EdgeDirection.Out)       (vprog: (VertexId, VD, A) => VD,        sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],        mergeMsg: (A, A) => A)
  • Create an initial message so that each vertex receives initially a Long.MaxValue
  • What values is required for activeDir ?
  • What are the three parameters of vprog ? Implement a vprog that keeps the minimum between the vertex attribute and the message received
  • sendMsg will be executed for each EdgeTriple. It should compare the values of the source and destination vertices. How do you get this values from the EdgeTriplet ?
  • Implements sendMsg so that it returns an iterator over a single pair (src->dest or dest->src depending on the vertices values) or Iterator.empty if no message is required.
  • Finally, implement the mergeMsg which merges all incoming messages to a vertice. What should be the behavior ?
  • Test your implementation on a simple hand-crafted graph with multiple components.