Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package hadoopsters.spark.scala.monitoring.listeners
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.scheduler._
import org.joda.time.DateTime
/**
* :: ExampleStreamingListener ::
* A simple StreamingListener that accesses summary statistics across Spark Streaming batches; inherits from DeveloperAPI.
*
* @param exampleArg You can pass whatever you want to a listener!
*/
class ExampleStreamingListener (exampleArg: String) extends StreamingListener {
// ====================
// onBatch_ Methods
// ====================
/**
* This method executes when a Spark Streaming batch completes.
*
* @param batchCompleted Class having information on the completed batch
*/
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
println("I was passed to the listener: " + exampleArg)
// write performance metrics somewhere
writeStatsSomewhere(batchCompleted)
// write offsets (state) somewhere, and numRecords per topic
processTopicInfo(batchCompleted)
}
/**
* This method executes when a Spark Streaming batch is submitted to the scheduler for execution.
*
* @param batchSubmitted Class having information on the completed batch
*/
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
}
/**
* This method executes when a Spark Streaming batch starts.
*
* @param batchStarted Class having information on the completed batch
*/
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
}
// ====================
// onReceiver_ Methods
// ====================
/**
* This method executes when a Spark Streaming receiver has started.
*
* @param receiverStarted Class having information on the receiver (e.g. errors, executor ids, etc)
*/
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
}
/**
* This method executes when a Spark Streaming receiver encounters an error.
*
* @param receiverError Class having information on the receiver (e.g. errors, executor ids, etc)
*/
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
}
/**
* This method executes when a Spark Streaming receiver stops working.
*
* @param receiverStopped Class having information on the receiver (e.g. errors, executor ids, etc)
*/
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {
}
// =======================================================================
// Convenience Methods (for use in onBatch_ methods)
// =======================================================================
/**
* Pulls, parses, and logs the key performance metrics of the Streaming app and logs them somewhere.
* Processing Time: How many seconds needed to complete this batch (i.e. duration).
* Scheduling Delay: How many seconds the start time of this bach was delayed.
* Num Records: The total number of input records from a live stream consumed this batch.
*
* @param batch Class having information on the completed batch
*/
def writeStatsSomewhere(batch: StreamingListenerBatchCompleted): Unit = {
// Store the processing time for this batch in seconds
val processingTime = if (batch.batchInfo.processingDelay.isDefined) {
batch.batchInfo.processingDelay.get / 1000
}
else {
0
}
// Store the scheduling delay for this batch in seconds
val schedulingDelay = if (batch.batchInfo.schedulingDelay.isDefined && batch.batchInfo.schedulingDelay.get > 0) {
batch.batchInfo.schedulingDelay.get / 1000
}
else {
0
}
// Store the total record count for this batch
val numRecords = batch.batchInfo.numRecords
// do something with `processingTime`
// do something with `schedulingDelay`
// do something with `numRecords`
}
/**
* A combination method that will process a topic in a batch.
*
* @param batch Class having information on the completed batch
*/
def processTopicInfo(batch: StreamingListenerBatchCompleted): Unit = {
// for each stream topic consumed this batch...
batch.batchInfo.streamIdToInputInfo.foreach(topic => {
writeTopicOffsetsSomewhere(topic)
writeTopicCountSomewhere(topic)
})
}
// =======================================================================
// Topic Methods (designed for use inside of convenience methods)
// =======================================================================
/**
* Takes a topic object and writes the max offset for each partition it contains this batch somewhere.
*
* @param topic A topic object within a Batch's StreamIdToInputInfo
*/
def writeTopicOffsetsSomewhere(topic: Tuple2[Int, StreamInputInfo]): Unit = {
// map offset info to OffsetRange objects
val partitionOffsets = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]]
// for every partition's range of offsets
partitionOffsets.map(offsetRange => {
// write the new starting offset for each partition in the topic to the state db
var maxOffset = offsetRange.untilOffset - 1
// do something with `offsetRange.topic`
// do something with `offsetRange.partition`
// do something with `offsetRange.count`
// do something with `maxOffset`
})
}
/**
* Takes a topic object and writes the number of records for said topic this batch somewhere.
*
* @param topic A topic object within a Batch's StreamIdToInputInfo
*/
def writeTopicCountSomewhere(topic: Tuple2[Int, StreamInputInfo]): Unit = {
// store the individual record count for this topic
val numRecords = topic._2.numRecords
// store topicName
val topicName = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]].head.topic
// write record count for this topic this batch
// do something with `topicName` and `numRecords`
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.