This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def main(args: Array[String]): Unit = { | |
val sc = SparkSession | |
.builder() | |
.appName(JOB_NAME) | |
.getOrCreate() | |
.sparkContext | |
val ssc = new StreamingContext(sc, Seconds(BATCH_DURATION_SECONDS)) | |
// Change the default OutputCommitter | |
val conf: JobConf = new JobConf(sc.hadoopConfiguration) | |
conf.setOutputCommitter(classOf[OBFileOutputCommitter]) | |
// update temporary path for committer, to be in the same volume | |
OBFileOutputCommitter.pendingDirName= "_tempJob1" | |
// Create a Kafka stream per topic | |
val singleDatumKafkaStream = KafkaUtils.createDirectStream[String, String]( | |
ssc, | |
LocationStrategies.PreferConsistent, | |
ConsumerStrategies.Subscribe[String, String](Array("some_topic"), buildConsumerConfig())) | |
//Change the temporary path for the job | |
singleDatumKafkaStream.foreachRDD { (rdd, batchTime) => | |
/* do your stuff */ | |
//persist files with unique, otherwise, the results of two jobs may override each other | |
rdd.saveAsHadoopFile(s"$basePath", classOf[String], classOf[String], classOf[ObMultipleTextOutputFormat[_, _]]) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment