Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
spark checkpoint loading exception
15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from file file:/var/tmp/cp/checkpoint-1421100410000.bk
15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file file:/var/tmp/cp/checkpoint-1421100410000.bk
java.io.IOException: java.lang.ClassNotFoundException: org.apache.spark.rdd.kafka.KafkaRDDPartition
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043)
at org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251)
at org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:239)
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:552)
at example.CheckpointedExample$.main(CheckpointedExample.scala:34)
at example.CheckpointedExample.main(CheckpointedExample.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:365)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.rdd.kafka.KafkaRDDPartition
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
at org.apache.spark.streaming.ObjectInputStreamWithLoader.resolveClass(Checkpoint.scala:279)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
at scala.collection.mutable.HashTable$class.init(HashTable.scala:105)
at scala.collection.mutable.HashMap.init(HashMap.scala:39)
at scala.collection.mutable.HashMap.readObject(HashMap.scala:142)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$readObject$1.apply$mcV$sp(DStreamCheckpointData.scala:148)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
... 52 more
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala b/external/kafka/src/main/scala/org/ap
index 537bcad..9f1abd6 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala
@@ -125,12 +125,12 @@ class DeterministicKafkaInputDStream[
private[streaming]
class DeterministicKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
def batchForTime = data.asInstanceOf[mutable.HashMap[
- Time, Array[(Int, String, Int, Long, Long, String, Int)]]]
+ Time, Array[KafkaRDDPartition]]]
override def update(time: Time) {
batchForTime.clear()
generatedRDDs.foreach { kv =>
- val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].batch.map(_.toTuple).toArray
+ val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].batch.toArray
batchForTime += kv._1 -> a
}
}
@@ -141,8 +141,8 @@ class DeterministicKafkaInputDStream[
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
- context.sparkContext, kafkaParams, b.map(KafkaRDDPartition(_)), messageHandler)
- }
+ context.sparkContext, kafkaParams, b, messageHandler)
+ }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.