package tv.spotx.scala.monitoring.listeners | |
import org.apache.spark.streaming.kafka010.OffsetRange | |
import org.apache.spark.streaming.scheduler._ | |
import org.joda.time.DateTime | |
import tv.spotx.scala.dbutils.{ConnectionPool, InfluxDBWriter, MySQLConnection} | |
/** | |
* :: SpotXSparkStreamingListener :: | |
* A simple StreamingListener that logs summary statistics across Spark Streaming batches; inherits from DeveloperAPI. | |
* | |
* @param influxHost Hostname of the Influx service | |
* @param influxDB Database name in Influx to write to | |
* @param influxMeasurement Measurement name in Influx to write to | |
* @param mySQLHost Hostname of the MySQL service | |
* @param mySQLDB Database name in MySQL to write to | |
* @param mySQLTable Table name in MySQL to write to | |
* @param mySQLUser Username for authentication in MySQL | |
* @param mySQLPwd Password for authentication in MySQL | |
* @param mySQLConsumer Unique name for tracking offsets across streaming apps | |
*/ | |
class SpotXSparkStreamingListener (influxHost: String, | |
influxDB: String, | |
influxMeasurement: String, | |
mySQLHost: String, | |
mySQLDB: String, | |
mySQLTable: String, | |
mySQLConsumer: String, | |
mySQLUser: String, | |
mySQLPwd: String) extends StreamingListener { | |
// Establish Database Connections | |
@transient lazy val influx = InfluxDBWriter.create(influxHost) | |
@transient lazy val mysql = MySQLConnection(host = mySQLHost, table = mySQLTable, username = mySQLUser, password = mySQLPwd, database = mySQLDB) | |
@transient lazy val mySQLConnectionPool = ConnectionPool(mysql.toString).getConnection | |
// ==================== | |
// onBatch_ Methods | |
// ==================== | |
/** | |
* This method executes when a Spark Streaming batch completes. | |
* | |
* @param batchCompleted Class having information on the completed batch | |
*/ | |
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = { | |
// write performance metrics to influx | |
writeBatchSchedulingStatsToInflux(batchCompleted) | |
// write offsets (state) to mysql | |
writeBatchOffsetsAndCounts(batchCompleted) | |
} | |
/** | |
* This method executes when a Spark Streaming batch is submitted to the scheduler for execution. | |
* | |
* @param batchSubmitted Class having information on the completed batch | |
*/ | |
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = { | |
} | |
/** | |
* This method executes when a Spark Streaming batch starts. | |
* | |
* @param batchStarted Class having information on the completed batch | |
*/ | |
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = { | |
} | |
// ==================== | |
// onReceiver_ Methods | |
// ==================== | |
/** | |
* This method executes when a Spark Streaming receiver has started. | |
* | |
* @param receiverStarted Class having information on the receiver (e.g. errors, executor ids, etc) | |
*/ | |
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { | |
} | |
/** | |
* This method executes when a Spark Streaming receiver encounters an error. | |
* | |
* @param receiverError Class having information on the receiver (e.g. errors, executor ids, etc) | |
*/ | |
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = { | |
} | |
/** | |
* This method executes when a Spark Streaming receiver stops working. | |
* | |
* @param receiverStopped Class having information on the receiver (e.g. errors, executor ids, etc) | |
*/ | |
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = { | |
} | |
// ======================================================================= | |
// Convenience Methods (for use in onBatch_ methods) | |
// ======================================================================= | |
/** | |
* Pulls, parses, and logs the key performance metrics of the Streaming app and logs them to Influx. | |
* Processing Time: How many seconds needed to complete this batch (i.e. duration). | |
* Scheduling Delay: How many seconds the start time of this bach was delayed. | |
* Num Records: The total number of input records from a live stream consumed this batch. | |
* | |
* @param batch Class having information on the completed batch | |
*/ | |
def writeBatchSchedulingStatsToInflux(batch: StreamingListenerBatchCompleted): Unit = { | |
// Store the processing time for this batch in seconds | |
val processingTime = if (batch.batchInfo.processingDelay.isDefined) { | |
batch.batchInfo.processingDelay.get / 1000 | |
} | |
else { | |
0 | |
} | |
// Store the scheduling delay for this batch in seconds | |
val schedulingDelay = if (batch.batchInfo.schedulingDelay.isDefined && batch.batchInfo.schedulingDelay.get > 0) { | |
batch.batchInfo.schedulingDelay.get / 1000 | |
} | |
else { | |
0 | |
} | |
// Store the total record count for this batch | |
val numRecords = batch.batchInfo.numRecords | |
// Log all three (3) metrics to Influx | |
influx.write(influxDB, influxMeasurement, Seq(), Seq(("processingTime", processingTime))) | |
influx.write(influxDB, influxMeasurement, Seq(), Seq(("schedulingDelay", schedulingDelay))) | |
influx.write(influxDB, influxMeasurement, Seq(), Seq(("numRecords", numRecords))) | |
} | |
/** | |
* A combination method that will handle both influx writes and MySQL offsets. | |
* This is effectively a convenience method of writeBatchOffsetsToMySQL + writeBatchCountsToInflux. | |
* | |
* @param batch Class having information on the completed batch | |
*/ | |
def writeBatchOffsetsAndCounts(batch: StreamingListenerBatchCompleted): Unit = { | |
// for each stream topic consumed this batch... | |
batch.batchInfo.streamIdToInputInfo.foreach(topic => { | |
// write offsets for this topic to mysql | |
writeTopicOffsetsToMySQL(topic) | |
// write record count for this topic this batch | |
writeTopicCountToInflux(topic) | |
}) | |
} | |
/** | |
* A convenience method for writing offsets to MySQL for each topic being consumed. | |
* | |
* @param batch Class having information on the completed batch | |
*/ | |
def writeBatchOffsetsToMySQL(batch: StreamingListenerBatchCompleted): Unit = { | |
// for each stream topic consumed this batch... | |
batch.batchInfo.streamIdToInputInfo.foreach(topic => { | |
// write offsets to mysql | |
writeTopicOffsetsToMySQL(topic) | |
}) | |
} | |
/** | |
* A convenience method for writing topic counts to Influx for each topic being consumed. | |
* | |
* @param batch Class having information on the completed batch | |
*/ | |
def writeBatchCountsToInflux(batch: StreamingListenerBatchCompleted): Unit = { | |
// for each stream topic consumed this batch... | |
batch.batchInfo.streamIdToInputInfo.foreach(topic => { | |
// write offsets to mysql | |
writeTopicCountToInflux(topic) | |
}) | |
} | |
// ======================================================================= | |
// Topic Methods (designed for use inside of convenience methods) | |
// ======================================================================= | |
/** | |
* Takes a topic object and writes the max offset for each partition it contains this batch to MySQL. | |
* | |
* @param topic A topic object within a Batch's StreamIdToInputInfo | |
*/ | |
def writeTopicOffsetsToMySQL(topic: Tuple2[Int, StreamInputInfo]): Unit = { | |
// map offset info to OffsetRange objects | |
val partitionOffsets = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]] | |
// for every partition's range of offsets | |
partitionOffsets.map(offsetRange => { | |
// write the new starting offset for each partition in the topic to the state db | |
var maxOffset = offsetRange.untilOffset - 1 | |
// create a now() timestamp | |
val now = new DateTime().toString("YYYY-MM-dd HH:mm:ss") | |
// form the sql | |
val sql = | |
s"""INSERT INTO $mySQLDB.$mySQLTable (consumer, topic, partition_id, offset, offset_ts, batch_size) | |
VALUES | |
('$mySQLConsumer', "${offsetRange.topic}", ${offsetRange.partition}, '$maxOffset', '$now', ${offsetRange.count}) | |
ON DUPLICATE KEY UPDATE offset_ts = VALUES(offset_ts), offset = VALUES(offset), | |
batch_size = VALUES(batch_size) | |
""" | |
// execute the sql to offload offsets to the table | |
val st = mySQLConnectionPool.createStatement | |
st.execute(sql) | |
st.close() | |
}) | |
} | |
/** | |
* Takes a topic object and writes the number of records for said topic this batch to Influx. | |
* | |
* @param topic A topic object within a Batch's StreamIdToInputInfo | |
*/ | |
def writeTopicCountToInflux(topic: Tuple2[Int, StreamInputInfo]): Unit = { | |
// store the individual record count for this topic | |
val numRecords = topic._2.numRecords | |
// store topicName | |
val topicName = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]].head.topic | |
// write record count for this topic this batch | |
influx.write(influxDB, influxMeasurement, Seq(), Seq(("numRecords_" + topicName, numRecords))) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment