Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
SPARK-2228 NoSuchElementError, key not found
import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}
object StackOverFlow {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("HDTM")
.setMaster("local[4]")
val sc = new SparkContext(conf)
val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L, 2L)))
val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L, 1L), Edge(2L, 0L, 2L)))
var g = Graph(v, e)
val vertexIds = Seq(0L, 1L, 2L)
var prevG: Graph[VertexId, Long] = null
for (i <- 1 to 2000) {
vertexIds.toStream.foreach(id => {
println("generate new graph")
prevG = g
g = Graph(g.vertices, g.edges)
g.cache()
g.edges.cache()
println(g.vertices.count()+g.edges.count())
println("uncache vertices")
prevG.unpersistVertices(blocking = false)
println("uncache edges")
prevG.edges.unpersist(blocking = false)
})
g.vertices.saveAsObjectFile("./checkpoint/g_v_" + i)
g.edges.saveAsObjectFile("./checkpoint/g_e_" + i)
g.vertices.unpersist(blocking = false)
g.edges.unpersist(blocking = false)
g = Graph(sc.objectFile("./checkpoint/g_v_" + i), sc.objectFile("./checkpoint/g_e_" + i))
g.cache()
g.edges.cache()
println(g.vertices.count()+g.edges.count())
println(" iter " + i + " finished")
}
println(g.vertices.collect().mkString(" "))
println(g.edges.collect().mkString(" "))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.