Skip to content

Instantly share code, notes, and snippets.

@zsxwing
Created August 19, 2015 09:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zsxwing/ec3be70fa8c4d6da1ecb to your computer and use it in GitHub Desktop.
Save zsxwing/ec3be70fa8c4d6da1ecb to your computer and use it in GitHub Desktop.
final JavaStreamingContext jssc = new JavaStreamingContext(...);
final Time exitTime = new Time(12345L); // Need to set the correct exit time
jssc.addStreamingListener(new StreamingListener(){
@Override
public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) {
if (batchCompleted.batchInfo().batchTime().greaterEq(exitTime)) {
new Thread() {
@Override
public void run() {
jssc.stop(true, true);
}
}.start();
}
}
});
JavaDStream<Integer> dstream = null;
dstream.foreachRDD(new Function2<JavaRDD<Integer>, Time, Void>() {
@Override
public Void call(JavaRDD<Integer> rdd, Time batchTime) throws Exception {
if (batchTime.less(exitTime)) {
rdd.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
// Do something in this rdd
}
});
} else {
// Skip rdd in this batch
}
return null;
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment