Skip to content

Instantly share code, notes, and snippets.

@shagunsodhani
Created December 20, 2015 11:55
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save shagunsodhani/af9677bdc79bb34be698 to your computer and use it in GitHub Desktop.
Save shagunsodhani/af9677bdc79bb34be698 to your computer and use it in GitHub Desktop.
Notes on Pregel Paper

The Pregel paper introduces a vertex-centric, large-scale graph computational model. Interestingly, the name Pregel comes from the name of the river which the Seven Bridges of Königsberg spanned.

Computational Model

The system takes as input a directed graph with properties assigned to both vertices and edges. The computation consists of a sequence of iterations, called supersteps. In each superstep, a user-defined function is invoked on each vertex in parallel. This function essentially implements the algorithm by specifying the behaviour of a single vertex V during a single superstep S. The function can read messages sent to the vertex V during the previous superstep (S-1), change the state of the vertex or its out-going edges', mutate the graph topology by adding/removing vertices or edges and by sending messages to other vertices that would be received in the next superstep (S+1). Since all computation during a superstep is performed locally, the model is well suited for distributed computing and synchronization is needed only between supersteps.

The computation terminates when every vertex is in the deactivated state. When computation starts, all vertices are in active state. A vertex deactivates itself by voting to halt and once deactivated, it does not take part in subsequent supersteps. But any time a deactivated vertex receives a message, it becomes activated again and takes part in subsequent supersteps. The resulting state machine is shown below:

Vertex State Machine

The output of the computation is the set of values produced by the vertices.

Pregel adopts a pure message passing model that eliminates the need of shared memory and remote reads. Messages can be delivered asynchronously thereby reducing the latency. Graph Algorithms can also be expressed as a sequence of MapReduce jobs, but that requires passing the entire state of the graph from one stage to another. It also requires coordinating the various steps of chained MapReduce. In contrast, Pregel keeps vertices and out-going edges on machine performing the computation and only messages are transferred across. Though Pregel is similar in concept to MapReduce, it comes with a natural graph API and efficient support for running iterative algorithms over the graph.

API

  1. Pregel programs are written by subclassing the vertex class.
  2. The programmer overrides the compute() method. This is the user-defined function that is invoked on each vertex.
  3. Within compute(), the programmer can modify the value of vertex or out-going edges' properties and these changes are reflected immediately. These are the only per-vertex state that persists throughout each superstep.
  4. All messages sent to vertex V during superstep S are available via an iterator in superstep S+1. While it is guaranteed that each message would be delivered exactly once, there is no guarantee on the order of delivery.
  5. User defined handlers are invoked when destination vertex does not exist.
  6. Combiners are used to reduce the number of messages to be transferred by aggregating messages from different vertices (all of which are on the same machine) which have the same destination vertex. Since there is no guarantee about which messages will be aggregated and in what order, only commutative and associative operations can be used.
  7. Aggregators can be used for capturing the global state of the graph where each vertex provides a value to the aggregator during each superstep and these values are combined by the system using a reduce operation (which should be associative and commutative). The aggregated value is then available to all the vertices in the next superstep.

Topology Mutation

Since compute() method allows the graph topology to be modified, conflicting requests can be made in the same superstep. Two mechanisms are used to handles the conflicts:

  • Within a super step, following order is followed when resolving conflicting operations:
    • Edge removal
    • Vertex removal
    • Vertex addition
    • Edge addition.
  • User defined handlers are used for conflicts that can not be resolved by partial ordering alone. This would include scenarios where there are multiple requests to create a vertex with different initial values. In these cases, the user defines how to resolve the conflict.

The coordination mechanism is lazy in the sense that global mutations do not require coordination until the point they are applied.

Execution Steps

  • Several instances of the user program start executing on a cluster of machines. One of the instances becomes the master and the rest are the workers.
  • The master partitions the graph based on vertex id with each partition consisting of a set of vertices and its out-going edges.
  • These partitions are assigned to workers. Each worker executes the compute method on all its vertices and manages messages to and from other vertices.
  • The master instructs each worker to perform a superstep. The workers run the computer method on each vertex and tell the master how many vertices would be active in the next superstep. This continues as long as even one vertex is active.
  • Once all the vertices become deactivated, the master may ask the workers to save their portion of the graph.

Fault Tolerance

Fault tolerance is achieved through checkpointing where the master instructs the workers to save the state of computation to persistent storage. Master issues regular "ping" messages to workers and if a worker does not receive a message from the master in a specified time interval, the worker terminates itself. If the master does not hear back from the worker, the worker is marked as failed. In this case, the graph partitions assigned to the failed worker are reassigned to the active workers. All the active workers then load the computation state from the last checkpoint and may repeat some supersteps.

An alternate to this would be confined recovery where along with basic checkpointing, the workers log out-going messages from their assigned partitions during graph loading and subsequent supersteps. This way, lost partitions can be recomputed from log messages and the entire system does not have to perform a rollback.

Worker Implementation

A worker contains a mapping of vertex id to vertex state for its portion of the complete graph. The vertex state would comprise of its current value, its put-going edges, the queue containing incoming messages and a flag marking whether it is in the active state. Two copies of queue and flag are maintained, one for the current superstep and one for the next superstep.

While sending a message to another vertex, the worker checks if the destination vertex is on the same machine. If yes, it places the message directly on the receiver's queue instead of sending it via the network. In case the vertex lies on the remote machine, the messages are buffered and sent to destination worker as a single network message.

If a combiner is specified, it is applied to both the message being added to the outgoing message queue and to the message received at the incoming message queue.

Master Implementation

The master coordinates the workers by maintaining a list of currently alive workers, their addressing information and the information on the portion of graph assigned to them. The size of master's data structure depends on the number of partitions and a single master can be used for a very large graph.

The master sends the computation task to workers and waits for a response. If any worker fails, the master enters recovery mode as discussed in the section on fault tolerance. Otherwise, it proceeds to the next superstep. It also runs an internal hHTTP server to serve statistics about the graph and the state of the computation.

Aggregators

Workers combine all the values supplied to an aggregator, by all the vertices in a superstep, into a single local value. At the end of the superstep, the workers perform the tree-based reduction on the local value and deliver the global values to the master. The tree-based reduction is better than pipelining with a chain of workers as it allows fro more parallelization.

Applications/Experiments

The paper has described how to implement PageRank, ShortestPath, Bipartite Matching and Semi Clustering algorithm on top of Pregel. The emphasis is on showing how to think of these algorithms in a vertex-centric manner and not on how to implement them on Pregel in the best possible way.

The experiments were conducted with the single-source shortest paths algorithm with input as binary trees and log-normal graphs. Default partitioning strategy and naive implementation of the algorithms was used to show that satisfactory performance can be achieved with little coding effort. The runtime increases approximately linearly in the graph size.

Limitations

One obvious limitation is that the entire computation state resides in main memory. Secondly, Pregel is designed around sparse graphs and performance will take a hit in case of dense graphs where a lot of communication takes place between vertices. The paper counters this by arguing that realistic dense graphs and algorithms with dense computation are rare. Moreover, communication in such dense networks can be reduced by using aggregators and combiners. An add-on would be to support dynamic partitioning of graph based on message traffic to minimize communication over the network.

Pregel's open-source implementation, called Giraph, adds several features beyond the basic Pregel model, including out-of-core computation, and edge-oriented input which does take away some of the original limitations. Facebook is using Giraph to analyze its social network and has scaled it to a trillion edges showing the scalability of the Pregel model itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment