Last active
May 15, 2019 20:36
-
-
Save ov7a/fc783315ea252a03d51804ce326a13b1 to your computer and use it in GitHub Desktop.
SPARK-27720 MWE for https://issues.apache.org/jira/browse/SPARK-27720
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
19/05/15 23:36:09 ERROR StreamingContext: Error starting the context, marking it as stopped | |
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access | |
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215) | |
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2199) | |
at org.apache.kafka.clients.consumer.KafkaConsumer.assignment(KafkaConsumer.java:847) | |
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:171) | |
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:260) | |
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54) | |
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54) | |
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) | |
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) | |
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) | |
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) | |
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) | |
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) | |
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) | |
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) | |
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) | |
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) | |
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) | |
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) | |
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) | |
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) | |
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) | |
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () | |
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) | |
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) | |
at Main$.delayedEndpoint$Main$1(Main.scala:79) | |
at Main$delayedInit$body.apply(Main.scala:11) | |
at scala.Function0$class.apply$mcV$sp(Function0.scala:34) | |
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) | |
at scala.App$$anonfun$main$1.apply(App.scala:76) | |
at scala.App$$anonfun$main$1.apply(App.scala:76) | |
at scala.collection.immutable.List.foreach(List.scala:381) | |
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) | |
at scala.App$class.main(App.scala:76) | |
at Main$.main(Main.scala:11) | |
at Main.main(Main.scala) | |
19/05/15 23:36:09 INFO ReceiverTracker: ReceiverTracker stopped | |
19/05/15 23:36:09 INFO JobGenerator: Stopping JobGenerator immediately | |
19/05/15 23:36:09 INFO RecurringTimer: Stopped timer for JobGenerator after time -1 | |
Exception in thread "main" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access | |
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215) | |
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2104) | |
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2059) | |
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.stop(DirectKafkaInputDStream.scala:270) | |
at org.apache.spark.streaming.DStreamGraph$$anonfun$stop$1.apply(DStreamGraph.scala:64) | |
at org.apache.spark.streaming.DStreamGraph$$anonfun$stop$1.apply(DStreamGraph.scala:64) | |
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) | |
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) | |
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) | |
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) | |
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) | |
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) | |
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) | |
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) | |
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) | |
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) | |
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) | |
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) | |
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) | |
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) | |
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.clients.consumer.ConsumerConfig | |
import org.apache.kafka.common.TopicPartition | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import org.apache.spark.streaming.kafka010.KafkaUtils | |
import org.apache.spark.streaming.{Seconds, StreamingContext} | |
import org.apache.spark.{SparkConf, SparkContext, SparkEnv} | |
import org.apache.spark.streaming.kafka010._ | |
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent | |
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe | |
object Main extends App { | |
val kafkaHost = "localhost:9092" //FIXME | |
val topic = "some-topic" //FIXME | |
val conf = new SparkConf() | |
.setIfMissing("spark.app.name", "Spark27720-mwe") | |
.setIfMissing("spark.master", "local[2]") | |
.setIfMissing("spark.executor.memory", "2g") | |
.setIfMissing("spark.cores.max", "1") | |
.set("spark.task.maxFailures", "1") | |
.set("spark.streaming.kafka.consumer.cache.enabled", "false") | |
val context = new SparkContext(conf) | |
val streamingContext = new StreamingContext(context, Seconds(1)) | |
val kafkaParams = Map[String, Object]( | |
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaHost, | |
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], | |
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], | |
ConsumerConfig.GROUP_ID_CONFIG -> "Spark27720-mwe", | |
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" | |
) | |
val topics = Array(topic) | |
class StreamWrapper(streamingContext: StreamingContext){ | |
lazy val stream = KafkaUtils.createDirectStream[String, String]( | |
streamingContext, | |
PreferConsistent, | |
Subscribe[String, String](topics, kafkaParams) | |
) | |
def init(): Unit = { | |
stream.foreachRDD { rdd => | |
val size = rdd.count() | |
println(s"Got $size messages") | |
} | |
} | |
def start(): Unit = { | |
stream.start() | |
} | |
def stop(): Unit = { | |
stream.stop() | |
} | |
} | |
val sw = new StreamWrapper(streamingContext) | |
sw.init() | |
val thread = new Thread(new Runnable { | |
override def run(): Unit = { | |
sw.start() | |
} | |
}) | |
thread.start() | |
streamingContext.start() | |
streamingContext.awaitTermination() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment