Code used to generate large graph bug
def edgeListFileIntWithoutDedupe2( | |
sc: SparkContext, | |
path: String, | |
canonicalOrientation: Boolean = false, | |
minEdgePartitions: Int = 1, | |
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, | |
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) | |
: Graph[Int, Int] = | |
{ | |
val startTime = System.currentTimeMillis | |
// Parse the edge data table directly into edge partitions | |
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)//.persist(StorageLevel.MEMORY_AND_DISK_SER) | |
//val lines = dupeRemoval(linesRaw).persist(StorageLevel.MEMORY_AND_DISK_SER) | |
//linesRaw.unpersist() | |
val edges = lines.mapPartitionsWithIndex { (pid, iter) => | |
val builder = new EdgePartitionBuilder[Int, Int] | |
iter.foreach { line => | |
if (!line.isEmpty && line(0) != '#') { | |
val lineArray = line.split("\\t") | |
if (lineArray.length >= 2) { | |
val srcId = lineArray(0).toLong | |
val dstId = lineArray(1).toLong | |
if (srcId < 2500000000L && dstId < 2500000000L) { | |
val edgeAttr: Int = | |
if (lineArray.length >= 3) lineArray(2).toInt | |
else 1 | |
if (canonicalOrientation && srcId > dstId) { | |
builder.add(dstId, srcId, edgeAttr) | |
} else { | |
builder.add(srcId, dstId, edgeAttr) | |
} | |
} | |
} else { | |
logWarning("Invalid line: " + line) | |
} | |
} | |
} | |
Iterator((pid, builder.toEdgePartition)) | |
}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path)) | |
edges.count() | |
//lines.unpersist() | |
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) | |
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel, | |
vertexStorageLevel = vertexStorageLevel) | |
} // end of edgeListFile |
package org.apache.spark.graphx | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.SparkConf | |
import org.apache.spark.storage.StorageLevel | |
import org.apache.spark.storage.StorageLevel._ | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.graphx._ | |
object PopulateCc2 { | |
def removeSelfCycle(g: Graph[Int,Int]) : Graph[Int, Int] = { | |
g.subgraph(epred = ed => ed.srcId !=ed.dstId) | |
} | |
def getEdges(graph : Graph[VertexId, Int]) : RDD[String] = { | |
val edges = graph.triplets.map {triplet=> | |
"%d\t%d\t%d\t%d".format(triplet.srcId, triplet.dstId, triplet.attr, triplet.srcAttr) | |
} | |
edges | |
} | |
def process(edgePath : String, noParts : Int) { | |
val conf = new SparkConf().setAppName("PopulateCc2") | |
.set("spark.executor.extraJavaOptions", " -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ") | |
.set("spark.storage.memoryFraction", "0.4") | |
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") | |
//.set("spark.shuffle.consolidateFiles", "true") | |
//.set("spark.akka.frameSize", "20") | |
.set("spark.io.compression.codec", "org.apache.spark.io.LZFCompressionCodec") | |
.set("spark.shuffle.memoryFraction", "0.3") | |
//.set("spark.eventLog.enabled", "true") | |
//.set("spark.eventLog.compress", "true") | |
//.set("spark.eventLog.dir", "file:///vol/spark-events") | |
.set("spark.akka.timeout", "300") | |
.set("spark.core.connection.ack.wait.timeout", "1800") | |
//.set("spark.shuffle.manager", "sort") | |
.set("spark.akka.askTimeout", "60") | |
.set("spark.storage.blockManagerTimeoutIntervalMs", "180000") | |
val sc = new SparkContext(conf) | |
val g1 = GraphLoader.edgeListFileIntWithoutDedupe2(sc, edgePath, true, noParts, StorageLevel.MEMORY_AND_DISK, StorageLevel.MEMORY_AND_DISK) | |
.partitionBy(PartitionStrategy.CanonicalRandomVertexCut).cache | |
val g2 : Graph[Int, Int] = g1.groupEdges((a, b) => a) //.persist(StorageLevel.MEMORY_ONLY_SER) | |
val g : Graph[Int, Int] = removeSelfCycle(g2)//.persist(StorageLevel.MEMORY_ONLY_SER) | |
g.cache | |
println("before cc numVertices=%s ; numEdges=%s ; edgePath=%s ; ".format( g.numVertices, g.numEdges, edgePath)) | |
/* | |
val cc = g.connectedComponents() | |
cc.cache | |
println("after cc numVertices=%s".format(cc.numVertices)) | |
DBUtils.unpersistGraph(g) | |
val edges = getEdges(cc) | |
edges.persist(StorageLevel.MEMORY_AND_DISK_SER) | |
edges.saveAsTextFile(edgesWithCcPath) | |
*/ | |
sc.stop | |
} | |
def main(args: Array[String]): Unit = { | |
process(args(0), 1024) | |
} | |
} |
package org.apache.spark.graphx | |
import org.apache.spark.SparkContext | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.SparkConf | |
import org.apache.spark.storage.StorageLevel | |
import org.apache.spark.storage.StorageLevel._ | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.graphx._ | |
import scala.collection.mutable | |
object NumEdgesAndVertices { | |
def process(noParts : Int, inEdgesPath : String, out: String ="tmp"): Unit = { | |
val conf = new SparkConf().setAppName("NumEdgesAndVertices") | |
.set("spark.executor.extraJavaOptions", " -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ") | |
.set("spark.storage.memoryFraction", "0.3") | |
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | |
.set("spark.akka.timeout", "300") | |
.set("spark.core.connection.ack.wait.timeout", "1800") | |
.set("spark.akka.askTimeout", "60") | |
.set("spark.storage.blockManagerTimeoutIntervalMs", "180000") | |
.set("spark.akka.frameSize", "200") | |
val sc = new SparkContext(conf) | |
val in = sc.textFile(inEdgesPath, noParts).coalesce(noParts) | |
println("numEdges=%s".format(in.count)) | |
val filteredEdges = in | |
.filter {line=> | |
val lineArray = line.split("\\t") | |
lineArray(0).toLong < 2500000000L && lineArray(1).toLong < 2500000000L | |
}.persist(StorageLevel.MEMORY_AND_DISK_SER) | |
val nodes = filteredEdges.flatMap {line=> | |
val lineArray = line.split("\\t") | |
Iterator(lineArray(0).toLong, lineArray(1).toLong) | |
}.distinct(noParts) | |
println("numNodes=%s ; noEdges=%s, path=%s".format(nodes.count, filteredEdges.count, inEdgesPath)) | |
sc.stop() | |
PopulateCc2.process(inEdgesPath, noParts) | |
/* | |
in.flatMap {line=> | |
val lineArray = line.split("\\t") | |
Iterator((lineArray(0).toLong, lineArray(3).toLong), (lineArray(1).toLong, lineArray(3).toLong)) | |
}.distinct(noParts) | |
.groupByKey(noParts).map {v=> | |
(v._2.size, v._1) | |
}.filter{v=> | |
v._1>1 | |
}.groupByKey(noParts).map { v => | |
(v._1, v._2.size) | |
}.sortByKey(false).saveAsTextFile(out) | |
*/ | |
} | |
def main(args: Array[String]): Unit = { | |
process(2048, args(0), args(1)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment