Skip to content

Instantly share code, notes, and snippets.

Last active September 5, 2022 01:10
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
def main(args: Array[String]): Unit = {
val sc = SparkSession
val ssc = new StreamingContext(sc, Seconds(BATCH_DURATION_SECONDS))
// Change the default OutputCommitter
val conf: JobConf = new JobConf(sc.hadoopConfiguration)
// update temporary path for committer, to be in the same volume
OBFileOutputCommitter.pendingDirName= "_tempJob1"
// Create a Kafka stream per topic
val singleDatumKafkaStream = KafkaUtils.createDirectStream[String, String](
  ConsumerStrategies.Subscribe[String, String](Array("some_topic"),  buildConsumerConfig()))
//Change the temporary path for the job
singleDatumKafkaStream.foreachRDD { (rdd, batchTime) =>
      /* do your stuff */
//persist files with unique, otherwise, the results of two jobs may override each other
rdd.saveAsHadoopFile(s"$basePath", classOf[String], classOf[String], classOf[ObMultipleTextOutputFormat[_, _]])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment