Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
def main(args: Array[String]): Unit = {
val sc = SparkSession
  .builder()
  .appName(JOB_NAME)
  .getOrCreate()
  .sparkContext
val ssc = new StreamingContext(sc, Seconds(BATCH_DURATION_SECONDS))
// Change the default OutputCommitter
val conf: JobConf = new JobConf(sc.hadoopConfiguration)
conf.setOutputCommitter(classOf[OBFileOutputCommitter])
// update temporary path for committer, to be in the same volume
OBFileOutputCommitter.pendingDirName= "_tempJob1"
// Create a Kafka stream per topic
val singleDatumKafkaStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](Array("some_topic"),  buildConsumerConfig()))
//Change the temporary path for the job
singleDatumKafkaStream.foreachRDD { (rdd, batchTime) =>
      /* do your stuff */
//persist files with unique, otherwise, the results of two jobs may override each other
rdd.saveAsHadoopFile(s"$basePath", classOf[String], classOf[String], classOf[ObMultipleTextOutputFormat[_, _]])
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.