Skip to content

Instantly share code, notes, and snippets.

@andr83
Created November 8, 2018 11:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andr83/6d873d03c201f8f4a635f097d3798606 to your computer and use it in GitHub Desktop.
Save andr83/6d873d03c201f8f4a635f097d3798606 to your computer and use it in GitHub Desktop.
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](job.ssc, kafkaParams, topics)
stream.foreachRDD(rdd =>
import sparkSession.implicits._
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val df = rdd.toDF()
df.write.format("sequence").mode("append").save(s"$workDir/$targetFile")
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment