Skip to content

Instantly share code, notes, and snippets.

@ankurdave
ankurdave / gist:4a17596669b36be06100
Last active February 20, 2017 01:09
Spark code to find distances to reachable source vertices using GraphX
// Spark code to find distances to reachable source vertices using GraphX.
// See http://apache-spark-user-list.1001560.n3.nabble.com/counting-degrees-graphx-td6370.html
import org.apache.spark.graphx._
import scala.collection.immutable.Map
val vertexArray = Array(
(1L,("101","x")),
(2L,("102","y")),
(3L,("103","y")),
import scala.language.higherKinds
// Common interface of RDD and DStream. Note the Coll type parameter, which will either be RDD or DStream.
trait DistributedCollection[A, Coll[_]] {
def map[B](f: A => B): Coll[B]
}
class RDD[A](val x: A) extends DistributedCollection[A, RDD] {
def map[B](f: A => B): RDD[B] = new RDD(f(x))
}
@ankurdave
ankurdave / gist:bb96ea237700f5cd670c
Last active August 29, 2015 14:04
sbt/sbt assembly fails on EC2 at apache/spark@284771e
ankurdave@ankur-mbp-2:~/repos/spark$ ~/repos/spark/ec2/spark-ec2 -s 1 -k ankur-mbp-2 -i ~/.ssh/ankur-mbp-2.pem -t m3.medium -z us-east-1e launch test-cluster
Setting up security groups...
Creating security group test-cluster-master
Creating security group test-cluster-slaves
Searching for existing cluster test-cluster...
Spark AMI: ami-5bb18832
Launching instances...
Launched 1 slaves in us-east-1e, regid = r-39c8dd48
Launched master in us-east-1e, regid = r-79cedb08
Waiting for instances to start up...
// Finds the connected component containing a particular vertex.
// In response to http://apache-spark-developers-list.1001551.n3.nabble.com/GraphX-some-vertex-with-specific-edge-td8436.html
import org.apache.spark.graphx._
// Construct the graph in the above example
val edges = sc.parallelize(List(
Edge(1L, 2L, "e1"), Edge(2L, 3L, "e1"), Edge(3L, 4L, "e1")))
val g: Graph[Int, String] = Graph.fromEdges(edges, 0)
@ankurdave
ankurdave / spark-env.sh
Created September 19, 2014 22:43
Spark v2 configuration
#!/usr/bin/env bash
SPARK_JAVA_OPTS+=" -Dspark.local.dir=/mnt/spark,/mnt2/spark"
export SPARK_JAVA_OPTS
export SPARK_MEM=58g
# Standalone cluster options
export SPARK_MASTER_OPTS=""
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_CORES=8
import org.apache.spark._
import org.apache.spark.graphx._
val triplets = sc.textFile(path).flatMap { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
if (lineArray.length < 2) {
None
} else {
val t = new EdgeTriplet[String, String]
@ankurdave
ankurdave / MyGraphLoader.scala
Created October 28, 2014 19:32
Modified version of GraphLoader for Spark 1.0 that allows setting the edge storage level
package org.apache.spark.graphx
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
import org.apache.spark.storage.StorageLevel
object MyGraphLoader extends Logging {
def edgeListFile(
sc: SparkContext,
path: String,
import org.apache.spark.graphx._
import org.apache.spark.graphx.impl._
import org.apache.spark._
def time[A](desc: String)(f: => A): A = {
val start = System.currentTimeMillis
val result = f
println(s"$desc: ${System.currentTimeMillis - start} ms")
result
}
@ankurdave
ankurdave / gist:6955f06250614a5c8a5c
Last active August 29, 2015 14:09
Attempt to use graphs larger than memory in GraphX (currently fails with OOM)
# Launch the cluster
~/repos/spark/ec2/spark-ec2 -s 16 -w 500 -k ankur-mbp-2 -i ~/.ssh/ankur-mbp-2.pem -t r3.2xlarge -z us-east-1d --spot-price=1 launch graphx-16-r3.2xlarge
# After connecting to the cluster, run the following:
~/spark/sbin/stop-all.sh && cd /mnt && git clone https://github.com/ankurdave/spark -b edges-on-disk && cd /mnt/spark && mkdir -p conf && cp ~/spark/conf/* conf/
echo "spark.core.connection.ack.wait.timeout 100000000" >> /mnt/spark/conf/spark-defaults.conf
echo "spark.storage.blockManagerSlaveTimeoutMs 100000000" >> /mnt/spark/conf/spark-defaults.conf
echo "spark.akka.timeout 100000" >> /mnt/spark/conf/spark-defaults.conf
@ankurdave
ankurdave / gist:cb89391101e4e87497ae
Last active August 29, 2015 14:10
Interface between GraphX and SampleClean
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import scala.reflect.ClassTag
// Take edges and build a graph
def a(vertices: RDD[(Long, Row)], edges: RDD[(Long, Long)]): Graph[Row, Unit] =
Graph(vertices, edges.map(pair => Edge(pair._1, pair._2, Unit)))
// Run connected components on the graph