Skip to content

Instantly share code, notes, and snippets.

@natewave
Created September 30, 2015 10:04
Show Gist options
  • Save natewave/1e8a8056c8ec2e5d6078 to your computer and use it in GitHub Desktop.
Save natewave/1e8a8056c8ec2e5d6078 to your computer and use it in GitHub Desktop.
class KafkaListener(saveProgress: Array[OffsetRange] => Boolean) extends SparkListener with Logging {
private val runningJobs = collection.mutable.Set.empty[SparkListenerJobStart]
private val kafkaRDDs = collection.mutable.Map.empty[Int, Array[OffsetRange]]
override def onJobStart(jobStart: SparkListenerJobStart) {
runningJobs += jobStart
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
runningJobs.find(_.jobId == jobEnd.jobId).map { job =>
runningJobs -= job
logDebug(s"Job just finished with result $jobEnd")
// check if the job contained kafka RDDs
job.stageInfos.foreach { stageInfo =>
stageInfo.rddInfos.map(_.id).foreach { rddId =>
kafkaRDDs.get(rddId).foreach { offsetRanges =>
// we have a kafka rdd
kafkaRDDs -= rddId
if (jobEnd.jobResult == JobSucceeded) {
logDebug(s"[+] Job succeeded for kafka RDD: $rddId")
logDebug(s"[+] Trying to save progress.")
val checkpointed = saveProgress(offsetRanges)
if (checkpointed) logDebug(s"[+] Progress saved to kafka storage.")
else logDebug(s"[-] Could not save progress, will consume this RDD again on restart.")
} else {
logError(s"[-] Failed to process the rdd $rddId with offsets = ${offsetRanges.toSeq}")
// TODO: store failure somewhere
}
}
}
}
}
}
def registerKafkaRDD(id: Int, offsets: Array[OffsetRange]) {
kafkaRDDs += (id -> offsets)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment