Skip to content

Instantly share code, notes, and snippets.

@ov7a
Last active May 15, 2019 20:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ov7a/fc783315ea252a03d51804ce326a13b1 to your computer and use it in GitHub Desktop.
Save ov7a/fc783315ea252a03d51804ce326a13b1 to your computer and use it in GitHub Desktop.
19/05/15 23:36:09 ERROR StreamingContext: Error starting the context, marking it as stopped
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2199)
at org.apache.kafka.clients.consumer.KafkaConsumer.assignment(KafkaConsumer.java:847)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:171)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:260)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at Main$.delayedEndpoint$Main$1(Main.scala:79)
at Main$delayedInit$body.apply(Main.scala:11)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at Main$.main(Main.scala:11)
at Main.main(Main.scala)
19/05/15 23:36:09 INFO ReceiverTracker: ReceiverTracker stopped
19/05/15 23:36:09 INFO JobGenerator: Stopping JobGenerator immediately
19/05/15 23:36:09 INFO RecurringTimer: Stopped timer for JobGenerator after time -1
Exception in thread "main" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2215)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2104)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2059)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.stop(DirectKafkaInputDStream.scala:270)
at org.apache.spark.streaming.DStreamGraph$$anonfun$stop$1.apply(DStreamGraph.scala:64)
at org.apache.spark.streaming.DStreamGraph$$anonfun$stop$1.apply(DStreamGraph.scala:64)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object Main extends App {
val kafkaHost = "localhost:9092" //FIXME
val topic = "some-topic" //FIXME
val conf = new SparkConf()
.setIfMissing("spark.app.name", "Spark27720-mwe")
.setIfMissing("spark.master", "local[2]")
.setIfMissing("spark.executor.memory", "2g")
.setIfMissing("spark.cores.max", "1")
.set("spark.task.maxFailures", "1")
.set("spark.streaming.kafka.consumer.cache.enabled", "false")
val context = new SparkContext(conf)
val streamingContext = new StreamingContext(context, Seconds(1))
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaHost,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> "Spark27720-mwe",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
val topics = Array(topic)
class StreamWrapper(streamingContext: StreamingContext){
lazy val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
def init(): Unit = {
stream.foreachRDD { rdd =>
val size = rdd.count()
println(s"Got $size messages")
}
}
def start(): Unit = {
stream.start()
}
def stop(): Unit = {
stream.stop()
}
}
val sw = new StreamWrapper(streamingContext)
sw.init()
val thread = new Thread(new Runnable {
override def run(): Unit = {
sw.start()
}
})
thread.start()
streamingContext.start()
streamingContext.awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment