Skip to content

Instantly share code, notes, and snippets.

@ankurdave
Last active August 29, 2015 14:08
Show Gist options
  • Save ankurdave/dc9f684b880d53ee162c to your computer and use it in GitHub Desktop.
Save ankurdave/dc9f684b880d53ee162c to your computer and use it in GitHub Desktop.
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl._
import org.apache.spark._
def time[A](desc: String)(f: => A): A = {
val start = System.currentTimeMillis
val result = f
println(s"$desc: ${System.currentTimeMillis - start} ms")
result
}
val builder = new FreshEdgePartitionBuilder[Double, Double]
val path = "/Users/ankurdave/Downloads/uk-2007-05-coalesced-part-00137"
val edges = time("load edges") {
scala.io.Source.fromFile(path).getLines.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if (lineArray.length < 2) {
throw new Exception("Invalid line: " + line)
}
val srcId = lineArray(0).toLong
val dstId = lineArray(1).toLong
builder.add(srcId, dstId, 1.0)
}
}
}
val edgePartition: EdgePartition[Double, Double] =
time("finalize edge partition") {
builder.toEdgePartition
}
val tripletPartition: EdgePartition[Double, Double] =
time("upgrade edge partition") {
edgePartition.updateVertices(edgePartition.global2local.iterator.map(kv => (kv._1, 1.0)))
}
def mapFunc(e: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
Iterator((e.dstId, e.srcAttr * e.attr))
def reduceFunc(a: Double, b: Double) = a + b
time("preAgg") {
val aggregates = new Array[Double](tripletPartition.vertexAttrs.length)
val bitset = new org.apache.spark.util.collection.BitSet(tripletPartition.vertexAttrs.length)
val numEdges = tripletPartition.srcIds.length
var i = 0
var edge = new EdgeTriplet[Double, Double]
while (i < numEdges) {
edge.localSrcId = tripletPartition.localSrcIds(i)
edge.localDstId = tripletPartition.localDstIds(i)
edge.srcId = tripletPartition.local2global(edge.localSrcId)
edge.dstId = tripletPartition.local2global(edge.localDstId)
edge.attr = tripletPartition.data(i)
edge.srcAttr = tripletPartition.vertexAttrs(edge.localSrcId)
mapFunc(edge).foreach { kv =>
val globalId = kv._1
val localId = if (globalId == edge.srcId) edge.localSrcId else edge.localDstId
val msg = kv._2
if (bitset.get(localId)) {
aggregates(localId) = reduceFunc(aggregates(localId), msg)
} else {
aggregates(localId) = msg
bitset.set(localId)
}
}
i += 1
}
bitset.iterator.map { localId => (edgePartition.local2global(localId), aggregates(localId)) }.foreach(x => {})
}
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl._
import org.apache.spark._
def time[A](desc: String)(f: => A): A = {
val start = System.currentTimeMillis
val result = f
println(s"$desc: ${System.currentTimeMillis - start} ms")
result
}
val builder = new EdgePartitionBuilder[Double, Double]
val path = "/mnt/part-00005"
val edges = time("load edges") {
scala.io.Source.fromFile(path).getLines.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if (lineArray.length < 2) {
throw new Exception("Invalid line: " + line)
}
val srcId = lineArray(0).toLong
val dstId = lineArray(1).toLong
builder.add(srcId, dstId, 1.0)
}
}
}
val edgePartition: EdgePartition[Double, Double] =
time("finalize edge partition") {
builder.toEdgePartition
}
val routingTablePartition =
time("build routing table") {
RoutingTablePartition.fromMsgs(1, RoutingTablePartition.edgePartitionToMsgs(0, edgePartition))
}
val vertexPartition: ShippableVertexPartition[Double] =
time("build vertex partition") {
ShippableVertexPartition(Iterator.empty, routingTablePartition, 1.0)
}
val tripletPartition: EdgePartition[Double, Double] =
time("upgrade edge partition") {
edgePartition.updateVertices(vertexPartition.iterator)
}
def mapFunc(e: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
Iterator((e.dstId, e.srcAttr * e.attr))
def reduceFunc(a: Double, b: Double) = a + b
val result =
time("preAgg") {
val mapOutputs = tripletPartition.upgradeIterator(tripletPartition.iterator, true, false)
.flatMap(mapFunc(_))
tripletPartition.vertices.aggregateUsingIndex(mapOutputs, reduceFunc).iterator
}
#include <cstdlib>
#include <unordered_map>
#include <vector>
#include <ctime>
// Usage:
// g++ -Wall -O3 mrTriplets-localids.cpp -o mrTriplets-localids
// ./mrTriplets-localids 14167504 < ~/Downloads/uk-2007-05-coalesced-part-00137
int main(int argc, char* argv[]) {
int num_edges = atoi(argv[1]);
std::vector<int> srcIds(num_edges);
std::vector<int> dstIds(num_edges);
std::vector<double> attrs(num_edges, 1.0);
std::unordered_map<long, int> vertex_index;
printf("Loading...\n");
long src, dst;
int voffset = 0;
for (int i = 0; i < num_edges; i++) {
scanf("%ld\t%ld", &src, &dst);
if (vertex_index.find(src) == vertex_index.end()) {
vertex_index[src] = voffset;
srcIds[i] = voffset;
voffset++;
} else {
srcIds[i] = vertex_index[src];
}
if (vertex_index.find(dst) == vertex_index.end()) {
vertex_index[dst] = voffset;
dstIds[i] = voffset;
voffset++;
} else {
dstIds[i] = vertex_index[dst];
}
}
std::vector<double> vertex_attrs(voffset, 1.0);
std::vector<double> vertex_preagg(voffset, 0.0);
printf("Scanning...\n");
clock_t start_time = clock();
for (int i = 0; i < num_edges; i++) {
vertex_preagg[dstIds[i]] += vertex_attrs[srcIds[i]] * attrs[i];;
}
clock_t end_time = clock();
printf("Scanned %d edges in %f seconds\n",
num_edges, (end_time - start_time) / static_cast<double>(CLOCKS_PER_SEC));
return 0;
}
#include <cstdlib>
#include <unordered_map>
#include <vector>
#include <ctime>
// Usage:
// g++ -Wall -O3 mrTriplets.cpp -o mrTriplets
// ./mrTriplets 14167504 < ~/Downloads/uk-2007-05-coalesced-part-00137
int main(int argc, char* argv[]) {
int num_edges = atoi(argv[1]);
std::vector<long> srcIds(num_edges);
std::vector<long> dstIds(num_edges);
std::vector<double> attrs(num_edges, 1.0);
std::unordered_map<long, double> vertices;
std::unordered_map<long, double> preagg;
printf("Loading...\n");
long src, dst;
for (int i = 0; i < num_edges; i++) {
scanf("%ld\t%ld", &src, &dst);
srcIds[i] = src;
dstIds[i] = dst;
vertices[src] = 1.0;
vertices[dst] = 1.0;
}
printf("Scanning...\n");
clock_t start_time = clock();
for (int i = 0; i < num_edges; i++) {
double msg = vertices[srcIds[i]] * attrs[i];
std::unordered_map<long, double>::const_iterator it = vertices.find(dstIds[i]);
if (it == vertices.end()) { // New key
vertices[dstIds[i]] = msg;
} else { // Existing key
vertices[dstIds[i]] += msg;
}
}
clock_t end_time = clock();
printf("Scanned %d edges in %f seconds\n",
num_edges, (end_time - start_time) / static_cast<double>(CLOCKS_PER_SEC));
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment