Example consists of three parts:
- Scala object for reading data
- Scala object for shutdown utility
- Start, stop and log4j scripts and configuration
For deep dive into Apache Kafka and Apache Spark I recommend this blog post
The application uses a batch interval of 10 seconds. For recovery I use this option: a combination of
ConsumerConfig.GROUP_ID_CONFIG -> groupId
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
and explicit commit after processing batch RDDs
dstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
For more information on different recovery options please refer to Apache Spark Kafka integration guide
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
/**
*
*/
object KafkaStreamSimpleController {
val logger = LoggerFactory.getLogger(this.getClass.getName)
def createKafkaStreamingContext(conf: SparkConf, batchDuration: Long = 10L): StreamingContext = new StreamingContext(conf, Seconds(batchDuration))
/**
* simple factory function for kafka dstream
*
* using group id
*
* @param topic
* @param offsetReset
* @param groupId
* @return
*/
def connectToKafka(ssc: StreamingContext, topic: String, offsetReset: String, groupId: String): InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String, String]] = {
val kafkaParams = Map[String, String](ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false")
val consumer = ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams)
org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, consumer)
}
/**
* e.g. custom transform pipeline
*
* @param rdd
* @return
*/
def customTransform(rdd: RDD[org.apache.kafka.clients.consumer.ConsumerRecord[String, String]]): RDD[(String, String)] = {
rdd.map(t => (t.key(), t.value()))
}
/**
*
*
* @param args
*/
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("myTest") //
val topic = "kafkaToopic"
val groupId = "myGroupId"
val offsetReset = "latest"
val triggerDirectory: String = "/user/myUser/shutdown"
val triggerFilePrefix: String = "kafkaReader"
val ssc = createKafkaStreamingContext(conf)
// set connection properties for kafka and return dstream definition
val dstream: InputDStream[org.apache.kafka.clients.consumer.ConsumerRecord[String, String]] = connectToKafka(ssc, topic, offsetReset, groupId)
// basic pattern
dstream.foreachRDD {
rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// show kafka partition meta info
offsetRanges.foreach(t => logger.info("Topic: %s, Partition: %d, fromOffset: %d, untilOffset: %d ".format(t.topic, t.partition, t.fromOffset, t.untilOffset)))
// add at this place custom transformations and actions
customTransform(rdd).foreachPartition(
iterator =>
iterator.foreach(x => logger.info(s"Key: ${x._1} => Value: ${x._2}"))
)
// commit
dstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
// start
ssc.start()
// set file based watchdog
FileBasedShutdown.checkRepeatedlyShutdownAndStop(ssc, triggerDirectory, triggerFilePrefix)
}
}
For shutting down the application I used the solution described in this blog post. At the end of the application I call a
// start
ssc.start()
// set file based watchdog
FileBasedShutdown.checkRepeatedlyShutdownAndStop(ssc, triggerDirectory, triggerFilePrefix)
The later function periodically checks if a file in HDFS exists. If it is the case streaming context is gracefully shutdown
streamingContext.stop(false, true)
Shutdown code:
import java.io.IOException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.StreamingContext
object FileBasedShutdown {
/**
* last statement in init
*
* @param triggerDirectory
* @param triggerFilePrefix
* @param triggerSuffix
* @param successSuffix
*/
def checkRepeatedlyShutdownAndStop( streamingContext : StreamingContext,
triggerDirectory : String, triggerFilePrefix : String,
triggerSuffix : String = ".shutdown", successSuffix: String = ".shutdown.SUCCESS",
waitTimeInMs : Long = 10000L): Unit = {
var stopped: Boolean = false
while(!stopped){
try {
// check
checkAndStopContextIfFileExistsAndClear(streamingContext, triggerDirectory : String, triggerFilePrefix : String,
triggerSuffix : String, successSuffix: String)
stopped = streamingContext.awaitTerminationOrTimeout(waitTimeInMs)
}catch{
case ex : Throwable => {
stopped = true
throw new RuntimeException("Shutdown problem", ex )
}
}
}
}
/**
*
*
* @param triggerDirectory
* @param triggerFilePrefix
* @param triggerSuffix
* @param successSuffix
*/
def checkAndStopContextIfFileExistsAndClear(streamingContext : StreamingContext, triggerDirectory : String, triggerFilePrefix : String,
triggerSuffix : String, successSuffix: String): Unit ={
val triggerFilePath: Path = new Path(triggerDirectory + Path.SEPARATOR + triggerFilePrefix + triggerSuffix)
val successFilePath: Path = new Path(triggerDirectory + Path.SEPARATOR + triggerFilePrefix + successSuffix)
var fs: FileSystem = null
try {
fs = triggerFilePath.getFileSystem(new Configuration)
if (fs.exists(triggerFilePath)){
try {
streamingContext.stop(false, true)
}catch {
case _ : Throwable =>
}
fs.delete(triggerFilePath, false)
fs.createNewFile(successFilePath)
}
}
catch {
case ioe: IOException => throw new RuntimeException("IO Problem!", ioe)
case _ : Throwable => throw new RuntimeException("")
}finally {
if(fs != null){
try{
fs.close()
}catch{
// do nothing
case _ : Throwable =>
}
}
}
}
}
Start and stop scripts can be implemented as follows:
startScript
..\spark-submit --deploy-mode cluster --master yarn --queue $queueName --num-executors 2 --executor-cores 2 --executor-memory 1G --driver-memory 1G --conf spark.yarn.driver.memoryOverhead=768 --files hdfs:///user/myUser/log4j-yarn.properties --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties --conf \”spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties -XX:+UseG1GC\”
stop script to stop the application. The script creates a file kafkaReader.shutdown.
#/bin/bash
APPID=kafkaReader
SHUTDOWN_DIR=/user/myUser/shutdown
hadoop fs -mkdir -p "$SHUTDOWN_DIR"
hadoop fs -mkdir -p "$CHECKPOINT_DIR"
hadoop fs -rm -f "$SHUTDOWN_DIR/${APPID}.shutdown.SUCCESS"
hadoop fs -touchz "$SHUTDOWN_DIR/${APPID}.shutdown"
hadoop fs -ls "$SHUTDOWN_DIR/${APPID}.shutdown.SUCCESS"
while [ ! -f "$SHUTDOWN_DIR/${APPID}.shutdown.SUCCESS" ]
do
sleep 5
done
set -e
hadoop fs -ls $SHUTDOWN_DIR
hadoop fs -rm -f "$SHUTDOWN_DIR/${APPID}.shutdown"
hadoop fs -rm -f "$SHUTDOWN_DIR/${APPID}.shutdown.SUCCESS"
echo "streaming job shutdown success"
# Spark Streaming Logging Configuration
# See also: http://spark.apache.org/docs/2.0.2/running-on-yarn.html#debugging-your-application
log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.spark.storage.BlockManager=OFF
log4j.logger.org.apache.spark.storage.BlockManagerInfo=OFF
# Write all logs to standard Spark stderr file
log4j.appender.stderr=org.apache.log4j.RollingFileAppender
log4j.appender.stderr.file=${spark.yarn.app.container.log.dir}/stderr
log4j.appender.stderr.threshold=INFO
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=%d %p %c %m %n
log4j.appender.stderr.maxFileSize=50MB
log4j.appender.stderr.maxBackupIndex=10
log4j.appender.stderr.encoding=UTF-8
# Write application logs to stdout file
log4j.appender.stdout=org.apache.log4j.RollingFileAppender
log4j.appender.stdout.append=true
log4j.appender.stdout.file=${spark.yarn.app.container.log.dir}/stdout
log4j.appender.stdout.threshold=INFO
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p %c %m %n
log4j.appender.stdout.maxFileSize=50MB
log4j.appender.stdout.maxBackupIndex=10
log4j.appender.stdout.encoding=UTF-8