Skip to content

Instantly share code, notes, and snippets.

@ankurdave
Last active August 29, 2015 14:09
Show Gist options
  • Save ankurdave/6955f06250614a5c8a5c to your computer and use it in GitHub Desktop.
Save ankurdave/6955f06250614a5c8a5c to your computer and use it in GitHub Desktop.
Attempt to use graphs larger than memory in GraphX (currently fails with OOM)
# Launch the cluster
~/repos/spark/ec2/spark-ec2 -s 16 -w 500 -k ankur-mbp-2 -i ~/.ssh/ankur-mbp-2.pem -t r3.2xlarge -z us-east-1d --spot-price=1 launch graphx-16-r3.2xlarge
# After connecting to the cluster, run the following:
~/spark/sbin/stop-all.sh && cd /mnt && git clone https://github.com/ankurdave/spark -b edges-on-disk && cd /mnt/spark && mkdir -p conf && cp ~/spark/conf/* conf/
echo "spark.core.connection.ack.wait.timeout 100000000" >> /mnt/spark/conf/spark-defaults.conf
echo "spark.storage.blockManagerSlaveTimeoutMs 100000000" >> /mnt/spark/conf/spark-defaults.conf
echo "spark.akka.timeout 100000" >> /mnt/spark/conf/spark-defaults.conf
echo "spark.serializer org.apache.spark.serializer.KryoSerializer" >> /mnt/spark/conf/spark-defaults.conf
echo "spark.storage.memoryFraction 0.2" >> /mnt/spark/conf/spark-defaults.conf
echo "spark.locality.wait 10000000" >> /mnt/spark/conf/spark-defaults.conf
cd /mnt/spark && sbt/sbt assembly && ~/spark-ec2/copy-dir --delete /mnt/spark && /mnt/spark/sbin/stop-all.sh && sleep 5 && /mnt/spark/sbin/start-all.sh && sleep 10
/mnt/spark/bin/spark-shell
# Paste the following into the Spark shell:
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
import org.apache.spark.graphx.util._
import org.apache.spark.storage.StorageLevel
import scala.util.Random
def sampleLogNormal(
mu: Double, sigma: Double, maxVal: Int, seed: Long = -1): Int = {
val rand = if (seed == -1) new Random() else new Random(seed)
val sigmaSq = sigma * sigma
val m = math.exp(mu + sigmaSq / 2.0)
val s = math.sqrt(math.expm1(sigmaSq) * math.exp(2*mu + sigmaSq))
var X: Double = maxVal
while (X >= maxVal) {
val Z = rand.nextGaussian()
X = math.exp(mu + sigma*Z)
}
math.floor(X).toInt
}
def generateRandomEdges(
src: Int, numEdges: Int, maxVertexId: Int, seed: Long = -1): Array[Edge[Int]] = {
val rand = if (seed == -1) new Random() else new Random(seed)
Array.fill(numEdges) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
}
val (seed1, seed2) = {
val seedRand = new scala.util.Random(0)
(seedRand.nextInt(), seedRand.nextInt())
}
val numVertices = 100000000
val numEParts = 256
val mu = 4.0
val sigma = 1.3
val vertices: RDD[(VertexId, Long)] = sc.parallelize(0 until numVertices, numEParts).map {
src => (src, sampleLogNormal(mu, sigma, numVertices, seed = (seed1 ^ src)))
}
val edges = vertices.flatMap { case (src, degree) =>
generateRandomEdges(src.toInt, degree.toInt, numVertices, seed = (seed2 ^ src))
}
edges.count // 12660773124
val g = Graph[Long, Int](vertices, edges, 0, edgeStorageLevel = StorageLevel.MEMORY_AND_DISK, edgesOnDisk = true)
g.edges.count
g.degrees.max()(Ordering.by(_._2)) // (37109375,19851909)
g.staticPageRank(5) // currently fails with OOM
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment