def main(args: Array[String]): Unit = { | |
val sc = SparkSession | |
.builder() | |
.appName(JOB_NAME) | |
.getOrCreate() | |
.sparkContext | |
val ssc = new StreamingContext(sc, Seconds(BATCH_DURATION_SECONDS)) | |
// Change the default OutputCommitter | |
val conf: JobConf = new JobConf(sc.hadoopConfiguration) | |
conf.setOutputCommitter(classOf[OBFileOutputCommitter]) | |
// update temporary path for committer, to be in the same volume | |
OBFileOutputCommitter.pendingDirName= "_tempJob1" | |
// Create a Kafka stream per topic | |
val singleDatumKafkaStream = KafkaUtils.createDirectStream[String, String]( | |
ssc, | |
LocationStrategies.PreferConsistent, | |
ConsumerStrategies.Subscribe[String, String](Array("some_topic"), buildConsumerConfig())) | |
//Change the temporary path for the job | |
singleDatumKafkaStream.foreachRDD { (rdd, batchTime) => | |
/* do your stuff */ | |
//persist files with unique, otherwise, the results of two jobs may override each other | |
rdd.saveAsHadoopFile(s"$basePath", classOf[String], classOf[String], classOf[ObMultipleTextOutputFormat[_, _]]) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment