Created
January 12, 2015 22:10
-
-
Save koeninger/561a61482cd1b5b3600c to your computer and use it in GitHub Desktop.
spark checkpoint loading exception
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from file file:/var/tmp/cp/checkpoint-1421100410000.bk | |
15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file file:/var/tmp/cp/checkpoint-1421100410000.bk | |
java.io.IOException: java.lang.ClassNotFoundException: org.apache.spark.rdd.kafka.KafkaRDDPartition | |
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043) | |
at org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:606) | |
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) | |
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) | |
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) | |
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) | |
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) | |
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) | |
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) | |
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) | |
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) | |
at org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180) | |
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040) | |
at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:606) | |
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) | |
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) | |
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) | |
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) | |
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) | |
at org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251) | |
at org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239) | |
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) | |
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) | |
at org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:239) | |
at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:552) | |
at example.CheckpointedExample$.main(CheckpointedExample.scala:34) | |
at example.CheckpointedExample.main(CheckpointedExample.scala) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:606) | |
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:365) | |
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) | |
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) | |
Caused by: java.lang.ClassNotFoundException: org.apache.spark.rdd.kafka.KafkaRDDPartition | |
at java.net.URLClassLoader$1.run(URLClassLoader.java:366) | |
at java.net.URLClassLoader$1.run(URLClassLoader.java:355) | |
at java.security.AccessController.doPrivileged(Native Method) | |
at java.net.URLClassLoader.findClass(URLClassLoader.java:354) | |
at java.lang.ClassLoader.loadClass(ClassLoader.java:425) | |
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) | |
at java.lang.ClassLoader.loadClass(ClassLoader.java:358) | |
at java.lang.Class.forName0(Native Method) | |
at java.lang.Class.forName(Class.java:274) | |
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625) | |
at org.apache.spark.streaming.ObjectInputStreamWithLoader.resolveClass(Checkpoint.scala:279) | |
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) | |
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) | |
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) | |
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) | |
at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) | |
at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) | |
at scala.collection.mutable.HashTable$class.init(HashTable.scala:105) | |
at scala.collection.mutable.HashMap.init(HashMap.scala:39) | |
at scala.collection.mutable.HashMap.readObject(HashMap.scala:142) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:606) | |
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) | |
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) | |
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) | |
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) | |
at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$readObject$1.apply$mcV$sp(DStreamCheckpointData.scala:148) | |
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040) | |
... 52 more |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala b/external/kafka/src/main/scala/org/ap | |
index 537bcad..9f1abd6 100644 | |
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala | |
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala | |
@@ -125,12 +125,12 @@ class DeterministicKafkaInputDStream[ | |
private[streaming] | |
class DeterministicKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) { | |
def batchForTime = data.asInstanceOf[mutable.HashMap[ | |
- Time, Array[(Int, String, Int, Long, Long, String, Int)]]] | |
+ Time, Array[KafkaRDDPartition]]] | |
override def update(time: Time) { | |
batchForTime.clear() | |
generatedRDDs.foreach { kv => | |
- val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].batch.map(_.toTuple).toArray | |
+ val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].batch.toArray | |
batchForTime += kv._1 -> a | |
} | |
} | |
@@ -141,8 +141,8 @@ class DeterministicKafkaInputDStream[ | |
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) => | |
logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}") | |
generatedRDDs += t -> new KafkaRDD[K, V, U, T, R]( | |
- context.sparkContext, kafkaParams, b.map(KafkaRDDPartition(_)), messageHandler) | |
- } | |
+ context.sparkContext, kafkaParams, b, messageHandler) | |
+ } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment