Skip to content

Instantly share code, notes, and snippets.

@jexp
Created October 13, 2015 20:49
Show Gist options
  • Save jexp/0dfad34d49a16000e804 to your computer and use it in GitHub Desktop.
Save jexp/0dfad34d49a16000e804 to your computer and use it in GitHub Desktop.
DBPedia in Neo4j -> Read from Neo4j -> Run PageRank (5 iterations) -> Write back to Neo4j
// running spark on a large single machine
// 6 workers, with 12G RAM each -> 72G total and 8G for the driver -> 80G RAM in total
// the machine has 6 physical CPUs
// the jar contains just AnormCypher.org + Dependencies
neo@neo:/tmp/spark$ bin/spark-shell --jars ../neo4j/target/scala-2.10/Neo4j-Spark-Demo-assembly-1.0.jar --driver-memory 8G --executor-memory 12G --master local[6]
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
SQL context available as sqlContext.
import org.anormcypher._
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
var start = System.currentTimeMillis
val total = 100000000
val batch = total/1000000
// batch: Int = 100
// read 100 batches of 100k rows of relationships with skip <window> limit 100k
// each otherwise Spark runs into OOM keeping 100M rows at once
val links = sc.range(0,batch).repartition(batch).mapPartitionsWithIndex( (i,p) => {
val dbConn = Neo4jREST("localhost", 9474, "/db/data/", "neo4j", "test")
val q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1) as from, id(p2) as to skip {skip} limit 1000000"
p.flatMap( skip => {
Cypher(q).on("skip"->skip*1000000).apply()(dbConn).map(row =>
(row[Int]("from").toLong,row[Int]("to").toLong)
)
})
})
// links: org.apache.spark.rdd.RDD[(Long, Long)] = MapPartitionsRDD[6] at mapPartitionsWithIndex at <console>:34
links.cache
// res0: links.type = MapPartitionsRDD[6] at mapPartitionsWithIndex at <console>:34
links.count
// res1: Long = 100000000 - 100M entries
(System.currentTimeMillis - start)/1000/60
// res2: Long = 9 minutes to read 100M rows in 100 batches
// would be faster as a single big read but spark chokes on that
start = System.currentTimeMillis
// convert tuples into Edge's
val edges = links.map( l => Edge(l._1,l._2, None))
// edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[None.type]] = MapPartitionsRDD[7] at map at <console>:36
// create GraphX Graph
val g = Graph.fromEdges(edges,"none")
// g: org.apache.spark.graphx.Graph[String,None.type] = org.apache.spark.graphx.impl.GraphImpl@58d5bdb0
// alternative pageRank invocation
// val v = g.pageRank(.0001).vertices
// run 5 iterations of pagerank
val v = PageRank.run(g, 5).vertices
// v: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[92] at RDD at VertexRDD.scala:57
(System.currentTimeMillis - start)/1000/60
// res3: Long = 122 minutes to run pagerank on GraphX on 100M edges
start = System.currentTimeMillis
// partition the results into 1000 partitions to write data back concurrently to Neo4j
val res = v.repartition(total/100000).mapPartitions( part => {
val localConn = Neo4jREST("localhost", 9474, "/db/data/", "neo4j", "test")
val updateStmt = Cypher("UNWIND {updates} as update MATCH (p) where id(p) = update.id SET p.pagerank = update.rank")
val updates = part.map( v => Map("id"->v._1.toLong, "rank" -> v._2.toDouble))
val count = updateStmt.on("updates"->updates).execute()(localConn)
Iterator(part.size)
})
// res: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[103] at mapPartitions at <console>:42
res.count
// res4: Long = 1000
(System.currentTimeMillis - start)/1000/60
// res5: Long = 7 minutes to write page-rank data back to Neo4j
@jexp
Copy link
Author

jexp commented Oct 13, 2015

  • DBPedia (created with https://github.com/mirkonasato/graphipedia) in Neo4j 125M links, 11M pages
  • I can't create Neo4jRest and the queries outside of the worker calls, because Spark can't serialize them
  • Pulling all data from Neo4j in one go is faster but Spark chokes on that
  • PageRank runs really long 122 minutes, i.e. 2hrs
  • Writing back 1000 batches with 10k updates per batch should be way faster, I think this is a Spark limitation here

Note, if I run this in Neo4j as an extension, it takes 60-90 seconds, including reading the data, computing page-rank and writing the ranks back transactionally. :)

@kavipadm
Copy link

kavipadm commented Jul 4, 2017

Hi,

I am trying to run this code on Spark, however, receiving the error:
org.anormcypher.Neo4jREST.type does not take parameters
Could you please help me out on this.

Thanks and regards,
Kavita

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment