Last active
August 29, 2015 14:05
-
-
Save npanj/92e949d86d08715bf4bf to your computer and use it in GitHub Desktop.
Code used to generate large graph bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def edgeListFileIntWithoutDedupe2( | |
sc: SparkContext, | |
path: String, | |
canonicalOrientation: Boolean = false, | |
minEdgePartitions: Int = 1, | |
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, | |
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) | |
: Graph[Int, Int] = | |
{ | |
val startTime = System.currentTimeMillis | |
// Parse the edge data table directly into edge partitions | |
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)//.persist(StorageLevel.MEMORY_AND_DISK_SER) | |
//val lines = dupeRemoval(linesRaw).persist(StorageLevel.MEMORY_AND_DISK_SER) | |
//linesRaw.unpersist() | |
val edges = lines.mapPartitionsWithIndex { (pid, iter) => | |
val builder = new EdgePartitionBuilder[Int, Int] | |
iter.foreach { line => | |
if (!line.isEmpty && line(0) != '#') { | |
val lineArray = line.split("\\t") | |
if (lineArray.length >= 2) { | |
val srcId = lineArray(0).toLong | |
val dstId = lineArray(1).toLong | |
if (srcId < 2500000000L && dstId < 2500000000L) { | |
val edgeAttr: Int = | |
if (lineArray.length >= 3) lineArray(2).toInt | |
else 1 | |
if (canonicalOrientation && srcId > dstId) { | |
builder.add(dstId, srcId, edgeAttr) | |
} else { | |
builder.add(srcId, dstId, edgeAttr) | |
} | |
} | |
} else { | |
logWarning("Invalid line: " + line) | |
} | |
} | |
} | |
Iterator((pid, builder.toEdgePartition)) | |
}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path)) | |
edges.count() | |
//lines.unpersist() | |
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) | |
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel, | |
vertexStorageLevel = vertexStorageLevel) | |
} // end of edgeListFile |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.graphx | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.SparkConf | |
import org.apache.spark.storage.StorageLevel | |
import org.apache.spark.storage.StorageLevel._ | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.graphx._ | |
object PopulateCc2 { | |
def removeSelfCycle(g: Graph[Int,Int]) : Graph[Int, Int] = { | |
g.subgraph(epred = ed => ed.srcId !=ed.dstId) | |
} | |
def getEdges(graph : Graph[VertexId, Int]) : RDD[String] = { | |
val edges = graph.triplets.map {triplet=> | |
"%d\t%d\t%d\t%d".format(triplet.srcId, triplet.dstId, triplet.attr, triplet.srcAttr) | |
} | |
edges | |
} | |
def process(edgePath : String, noParts : Int) { | |
val conf = new SparkConf().setAppName("PopulateCc2") | |
.set("spark.executor.extraJavaOptions", " -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ") | |
.set("spark.storage.memoryFraction", "0.4") | |
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") | |
//.set("spark.shuffle.consolidateFiles", "true") | |
//.set("spark.akka.frameSize", "20") | |
.set("spark.io.compression.codec", "org.apache.spark.io.LZFCompressionCodec") | |
.set("spark.shuffle.memoryFraction", "0.3") | |
//.set("spark.eventLog.enabled", "true") | |
//.set("spark.eventLog.compress", "true") | |
//.set("spark.eventLog.dir", "file:///vol/spark-events") | |
.set("spark.akka.timeout", "300") | |
.set("spark.core.connection.ack.wait.timeout", "1800") | |
//.set("spark.shuffle.manager", "sort") | |
.set("spark.akka.askTimeout", "60") | |
.set("spark.storage.blockManagerTimeoutIntervalMs", "180000") | |
val sc = new SparkContext(conf) | |
val g1 = GraphLoader.edgeListFileIntWithoutDedupe2(sc, edgePath, true, noParts, StorageLevel.MEMORY_AND_DISK, StorageLevel.MEMORY_AND_DISK) | |
.partitionBy(PartitionStrategy.CanonicalRandomVertexCut).cache | |
val g2 : Graph[Int, Int] = g1.groupEdges((a, b) => a) //.persist(StorageLevel.MEMORY_ONLY_SER) | |
val g : Graph[Int, Int] = removeSelfCycle(g2)//.persist(StorageLevel.MEMORY_ONLY_SER) | |
g.cache | |
println("before cc numVertices=%s ; numEdges=%s ; edgePath=%s ; ".format( g.numVertices, g.numEdges, edgePath)) | |
/* | |
val cc = g.connectedComponents() | |
cc.cache | |
println("after cc numVertices=%s".format(cc.numVertices)) | |
DBUtils.unpersistGraph(g) | |
val edges = getEdges(cc) | |
edges.persist(StorageLevel.MEMORY_AND_DISK_SER) | |
edges.saveAsTextFile(edgesWithCcPath) | |
*/ | |
sc.stop | |
} | |
def main(args: Array[String]): Unit = { | |
process(args(0), 1024) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.graphx | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.SparkConf | |
import org.apache.spark.storage.StorageLevel | |
import org.apache.spark.storage.StorageLevel._ | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.graphx._ | |
import scala.collection.mutable | |
object NumEdgesAndVertices { | |
def process(noParts : Int, inEdgesPath : String, out: String ="tmp"): Unit = { | |
val conf = new SparkConf().setAppName("NumEdgesAndVertices") | |
.set("spark.executor.extraJavaOptions", " -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ") | |
.set("spark.storage.memoryFraction", "0.3") | |
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
.set("spark.akka.timeout", "300") | |
.set("spark.core.connection.ack.wait.timeout", "1800") | |
.set("spark.akka.askTimeout", "60") | |
.set("spark.storage.blockManagerTimeoutIntervalMs", "180000") | |
.set("spark.akka.frameSize", "200") | |
val sc = new SparkContext(conf) | |
val in = sc.textFile(inEdgesPath, noParts).coalesce(noParts) | |
println("numEdges=%s".format(in.count)) | |
val filteredEdges = in | |
.filter {line=> | |
val lineArray = line.split("\\t") | |
lineArray(0).toLong < 2500000000L && lineArray(1).toLong < 2500000000L | |
}.persist(StorageLevel.MEMORY_AND_DISK_SER) | |
val nodes = filteredEdges.flatMap {line=> | |
val lineArray = line.split("\\t") | |
Iterator(lineArray(0).toLong, lineArray(1).toLong) | |
}.distinct(noParts) | |
println("numNodes=%s ; noEdges=%s, path=%s".format(nodes.count, filteredEdges.count, inEdgesPath)) | |
sc.stop() | |
PopulateCc2.process(inEdgesPath, noParts) | |
/* | |
in.flatMap {line=> | |
val lineArray = line.split("\\t") | |
Iterator((lineArray(0).toLong, lineArray(3).toLong), (lineArray(1).toLong, lineArray(3).toLong)) | |
}.distinct(noParts) | |
.groupByKey(noParts).map {v=> | |
(v._2.size, v._1) | |
}.filter{v=> | |
v._1>1 | |
}.groupByKey(noParts).map { v => | |
(v._1, v._2.size) | |
}.sortByKey(false).saveAsTextFile(out) | |
*/ | |
} | |
def main(args: Array[String]): Unit = { | |
process(2048, args(0), args(1)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment