Skip to content

Instantly share code, notes, and snippets.

@bxshi
Last active August 29, 2015 14:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bxshi/b5c0fe0ae089c75a39bd to your computer and use it in GitHub Desktop.
Save bxshi/b5c0fe0ae089c75a39bd to your computer and use it in GitHub Desktop.
SPARK-2228 NoSuchElementError, key not found
import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}
object StackOverFlow {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("HDTM")
.setMaster("local[4]")
val sc = new SparkContext(conf)
val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L, 2L)))
val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L, 1L), Edge(2L, 0L, 2L)))
var g = Graph(v, e)
val vertexIds = Seq(0L, 1L, 2L)
var prevG: Graph[VertexId, Long] = null
for (i <- 1 to 2000) {
vertexIds.toStream.foreach(id => {
println("generate new graph")
prevG = g
g = Graph(g.vertices, g.edges)
g.cache()
g.edges.cache()
println(g.vertices.count()+g.edges.count())
println("uncache vertices")
prevG.unpersistVertices(blocking = false)
println("uncache edges")
prevG.edges.unpersist(blocking = false)
})
g.vertices.saveAsObjectFile("./checkpoint/g_v_" + i)
g.edges.saveAsObjectFile("./checkpoint/g_e_" + i)
g.vertices.unpersist(blocking = false)
g.edges.unpersist(blocking = false)
g = Graph(sc.objectFile("./checkpoint/g_v_" + i), sc.objectFile("./checkpoint/g_e_" + i))
g.cache()
g.edges.cache()
println(g.vertices.count()+g.edges.count())
println(" iter " + i + " finished")
}
println(g.vertices.collect().mkString(" "))
println(g.edges.collect().mkString(" "))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment