Skip to content

Instantly share code, notes, and snippets.

@ankurdave
Created October 28, 2014 19:32
Show Gist options
  • Save ankurdave/0394d47809297eea76ff to your computer and use it in GitHub Desktop.
Save ankurdave/0394d47809297eea76ff to your computer and use it in GitHub Desktop.
Modified version of GraphLoader for Spark 1.0 that allows setting the edge storage level
package org.apache.spark.graphx
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
import org.apache.spark.storage.StorageLevel
object MyGraphLoader extends Logging {
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] = {
val startTime = System.currentTimeMillis
// Parse the edge data table directly into edge partitions
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if (lineArray.length < 2) {
logWarning("Invalid line: " + line)
}
val srcId = lineArray(0).toLong
val dstId = lineArray(1).toLong
if (canonicalOrientation && srcId > dstId) {
builder.add(dstId, srcId, 1)
} else {
builder.add(srcId, dstId, 1)
}
}
}
Iterator((pid, builder.toEdgePartition))
}.persist(storageLevel)
edges.count()
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
val edgeRDD = new EdgeRDD(edges)
val vertexRDD = VertexRDD.fromEdges(edgeRDD, edgeRDD.partitions.size, defaultVal = 1)
GraphImpl.fromExistingRDDs(vertexRDD, edgeRDD)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment