Skip to content

Instantly share code, notes, and snippets.

@bernhardschaefer
Last active December 2, 2019 08:21
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save bernhardschaefer/c5619d4404d7120dcbe9f7dc7032dcf3 to your computer and use it in GitHub Desktop.
Save bernhardschaefer/c5619d4404d7120dcbe9f7dc7032dcf3 to your computer and use it in GitHub Desktop.
val checkpointDirectory = "hdfs:///path/to/checkpoint/dir"
def main(args: Array[String]) {
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Start the context
context.start()
// check every 10s for shutdown hdfs file
val checkIntervalMillis = 10000
var isStopped = false
while (!isStopped) {
val isStopped = context.awaitTerminationOrTimeout(checkIntervalMillis)
if (!isStopped && isShutdownRequested) {
val stopSparkContext = true
val stopGracefully = true
context.stop(stopSparkContext, stopGracefully)
}
}
}
def isShutdownRequested(): Boolean = {
val fs = FileSystem.get(new Configuration())
fs.exists(new Path("hdfs:///path/to/shutdown/file"))
}
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val sparkConf = new SparkConf()
val batchDuration = Durations.seconds(5)
val ssc = new StreamingContext(sparkConf, batchDuration)
// ... create DStream and define transformations and actions
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment