Skip to content

Instantly share code, notes, and snippets.

@npanj
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save npanj/92e949d86d08715bf4bf to your computer and use it in GitHub Desktop.
Save npanj/92e949d86d08715bf4bf to your computer and use it in GitHub Desktop.
Code used to generate large graph bug
def edgeListFileIntWithoutDedupe2(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] =
{
val startTime = System.currentTimeMillis
// Parse the edge data table directly into edge partitions
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)//.persist(StorageLevel.MEMORY_AND_DISK_SER)
//val lines = dupeRemoval(linesRaw).persist(StorageLevel.MEMORY_AND_DISK_SER)
//linesRaw.unpersist()
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\t")
if (lineArray.length >= 2) {
val srcId = lineArray(0).toLong
val dstId = lineArray(1).toLong
if (srcId < 2500000000L && dstId < 2500000000L) {
val edgeAttr: Int =
if (lineArray.length >= 3) lineArray(2).toInt
else 1
if (canonicalOrientation && srcId > dstId) {
builder.add(dstId, srcId, edgeAttr)
} else {
builder.add(srcId, dstId, edgeAttr)
}
}
} else {
logWarning("Invalid line: " + line)
}
}
}
Iterator((pid, builder.toEdgePartition))
}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()
//lines.unpersist()
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
} // end of edgeListFile
package org.apache.spark.graphx
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
object PopulateCc2 {
def removeSelfCycle(g: Graph[Int,Int]) : Graph[Int, Int] = {
g.subgraph(epred = ed => ed.srcId !=ed.dstId)
}
def getEdges(graph : Graph[VertexId, Int]) : RDD[String] = {
val edges = graph.triplets.map {triplet=>
"%d\t%d\t%d\t%d".format(triplet.srcId, triplet.dstId, triplet.attr, triplet.srcAttr)
}
edges
}
def process(edgePath : String, noParts : Int) {
val conf = new SparkConf().setAppName("PopulateCc2")
.set("spark.executor.extraJavaOptions", " -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ")
.set("spark.storage.memoryFraction", "0.4")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
//.set("spark.shuffle.consolidateFiles", "true")
//.set("spark.akka.frameSize", "20")
.set("spark.io.compression.codec", "org.apache.spark.io.LZFCompressionCodec")
.set("spark.shuffle.memoryFraction", "0.3")
//.set("spark.eventLog.enabled", "true")
//.set("spark.eventLog.compress", "true")
//.set("spark.eventLog.dir", "file:///vol/spark-events")
.set("spark.akka.timeout", "300")
.set("spark.core.connection.ack.wait.timeout", "1800")
//.set("spark.shuffle.manager", "sort")
.set("spark.akka.askTimeout", "60")
.set("spark.storage.blockManagerTimeoutIntervalMs", "180000")
val sc = new SparkContext(conf)
val g1 = GraphLoader.edgeListFileIntWithoutDedupe2(sc, edgePath, true, noParts, StorageLevel.MEMORY_AND_DISK, StorageLevel.MEMORY_AND_DISK)
.partitionBy(PartitionStrategy.CanonicalRandomVertexCut).cache
val g2 : Graph[Int, Int] = g1.groupEdges((a, b) => a) //.persist(StorageLevel.MEMORY_ONLY_SER)
val g : Graph[Int, Int] = removeSelfCycle(g2)//.persist(StorageLevel.MEMORY_ONLY_SER)
g.cache
println("before cc numVertices=%s ; numEdges=%s ; edgePath=%s ; ".format( g.numVertices, g.numEdges, edgePath))
/*
val cc = g.connectedComponents()
cc.cache
println("after cc numVertices=%s".format(cc.numVertices))
DBUtils.unpersistGraph(g)
val edges = getEdges(cc)
edges.persist(StorageLevel.MEMORY_AND_DISK_SER)
edges.saveAsTextFile(edgesWithCcPath)
*/
sc.stop
}
def main(args: Array[String]): Unit = {
process(args(0), 1024)
}
}
package org.apache.spark.graphx
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
import scala.collection.mutable
object NumEdgesAndVertices {
def process(noParts : Int, inEdgesPath : String, out: String ="tmp"): Unit = {
val conf = new SparkConf().setAppName("NumEdgesAndVertices")
.set("spark.executor.extraJavaOptions", " -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ")
.set("spark.storage.memoryFraction", "0.3")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.akka.timeout", "300")
.set("spark.core.connection.ack.wait.timeout", "1800")
.set("spark.akka.askTimeout", "60")
.set("spark.storage.blockManagerTimeoutIntervalMs", "180000")
.set("spark.akka.frameSize", "200")
val sc = new SparkContext(conf)
val in = sc.textFile(inEdgesPath, noParts).coalesce(noParts)
println("numEdges=%s".format(in.count))
val filteredEdges = in
.filter {line=>
val lineArray = line.split("\\t")
lineArray(0).toLong < 2500000000L && lineArray(1).toLong < 2500000000L
}.persist(StorageLevel.MEMORY_AND_DISK_SER)
val nodes = filteredEdges.flatMap {line=>
val lineArray = line.split("\\t")
Iterator(lineArray(0).toLong, lineArray(1).toLong)
}.distinct(noParts)
println("numNodes=%s ; noEdges=%s, path=%s".format(nodes.count, filteredEdges.count, inEdgesPath))
sc.stop()
PopulateCc2.process(inEdgesPath, noParts)
/*
in.flatMap {line=>
val lineArray = line.split("\\t")
Iterator((lineArray(0).toLong, lineArray(3).toLong), (lineArray(1).toLong, lineArray(3).toLong))
}.distinct(noParts)
.groupByKey(noParts).map {v=>
(v._2.size, v._1)
}.filter{v=>
v._1>1
}.groupByKey(noParts).map { v =>
(v._1, v._2.size)
}.sortByKey(false).saveAsTextFile(out)
*/
}
def main(args: Array[String]): Unit = {
process(2048, args(0), args(1))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment