Skip to content

Instantly share code, notes, and snippets.

@poluektik
Last active September 5, 2022 01:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save poluektik/d0481bbc93a8ffaf8f527516e200f9dc to your computer and use it in GitHub Desktop.
Save poluektik/d0481bbc93a8ffaf8f527516e200f9dc to your computer and use it in GitHub Desktop.
def main(args: Array[String]): Unit = {
val sc = SparkSession
  .builder()
  .appName(JOB_NAME)
  .getOrCreate()
  .sparkContext
val ssc = new StreamingContext(sc, Seconds(BATCH_DURATION_SECONDS))
// Change the default OutputCommitter
val conf: JobConf = new JobConf(sc.hadoopConfiguration)
conf.setOutputCommitter(classOf[OBFileOutputCommitter])
// update temporary path for committer, to be in the same volume
OBFileOutputCommitter.pendingDirName= "_tempJob1"
// Create a Kafka stream per topic
val singleDatumKafkaStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](Array("some_topic"),  buildConsumerConfig()))
//Change the temporary path for the job
singleDatumKafkaStream.foreachRDD { (rdd, batchTime) =>
      /* do your stuff */
//persist files with unique, otherwise, the results of two jobs may override each other
rdd.saveAsHadoopFile(s"$basePath", classOf[String], classOf[String], classOf[ObMultipleTextOutputFormat[_, _]])
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment