Skip to content

Instantly share code, notes, and snippets.

@hadoopsters
Last active November 10, 2021 16:16
Show Gist options
  • Save hadoopsters/8577cf5c4ce565d1af06a6c713779589 to your computer and use it in GitHub Desktop.
Save hadoopsters/8577cf5c4ce565d1af06a6c713779589 to your computer and use it in GitHub Desktop.
package tv.spotx.scala.monitoring.listeners
import org.apache.spark.streaming.kafka010.OffsetRange
import org.apache.spark.streaming.scheduler._
import org.joda.time.DateTime
import tv.spotx.scala.dbutils.{ConnectionPool, InfluxDBWriter, MySQLConnection}
/**
* :: SpotXSparkStreamingListener ::
* A simple StreamingListener that logs summary statistics across Spark Streaming batches; inherits from DeveloperAPI.
*
* @param influxHost Hostname of the Influx service
* @param influxDB Database name in Influx to write to
* @param influxMeasurement Measurement name in Influx to write to
* @param mySQLHost Hostname of the MySQL service
* @param mySQLDB Database name in MySQL to write to
* @param mySQLTable Table name in MySQL to write to
* @param mySQLUser Username for authentication in MySQL
* @param mySQLPwd Password for authentication in MySQL
* @param mySQLConsumer Unique name for tracking offsets across streaming apps
*/
class SpotXSparkStreamingListener (influxHost: String,
influxDB: String,
influxMeasurement: String,
mySQLHost: String,
mySQLDB: String,
mySQLTable: String,
mySQLConsumer: String,
mySQLUser: String,
mySQLPwd: String) extends StreamingListener {
// Establish Database Connections
@transient lazy val influx = InfluxDBWriter.create(influxHost)
@transient lazy val mysql = MySQLConnection(host = mySQLHost, table = mySQLTable, username = mySQLUser, password = mySQLPwd, database = mySQLDB)
@transient lazy val mySQLConnectionPool = ConnectionPool(mysql.toString).getConnection
// ====================
// onBatch_ Methods
// ====================
/**
* This method executes when a Spark Streaming batch completes.
*
* @param batchCompleted Class having information on the completed batch
*/
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
// write performance metrics to influx
writeBatchSchedulingStatsToInflux(batchCompleted)
// write offsets (state) to mysql
writeBatchOffsetsAndCounts(batchCompleted)
}
/**
* This method executes when a Spark Streaming batch is submitted to the scheduler for execution.
*
* @param batchSubmitted Class having information on the completed batch
*/
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
}
/**
* This method executes when a Spark Streaming batch starts.
*
* @param batchStarted Class having information on the completed batch
*/
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
}
// ====================
// onReceiver_ Methods
// ====================
/**
* This method executes when a Spark Streaming receiver has started.
*
* @param receiverStarted Class having information on the receiver (e.g. errors, executor ids, etc)
*/
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
}
/**
* This method executes when a Spark Streaming receiver encounters an error.
*
* @param receiverError Class having information on the receiver (e.g. errors, executor ids, etc)
*/
override def onReceiverError(receiverError: StreamingListenerReceiverError): Unit = {
}
/**
* This method executes when a Spark Streaming receiver stops working.
*
* @param receiverStopped Class having information on the receiver (e.g. errors, executor ids, etc)
*/
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped): Unit = {
}
// =======================================================================
// Convenience Methods (for use in onBatch_ methods)
// =======================================================================
/**
* Pulls, parses, and logs the key performance metrics of the Streaming app and logs them to Influx.
* Processing Time: How many seconds needed to complete this batch (i.e. duration).
* Scheduling Delay: How many seconds the start time of this bach was delayed.
* Num Records: The total number of input records from a live stream consumed this batch.
*
* @param batch Class having information on the completed batch
*/
def writeBatchSchedulingStatsToInflux(batch: StreamingListenerBatchCompleted): Unit = {
// Store the processing time for this batch in seconds
val processingTime = if (batch.batchInfo.processingDelay.isDefined) {
batch.batchInfo.processingDelay.get / 1000
}
else {
0
}
// Store the scheduling delay for this batch in seconds
val schedulingDelay = if (batch.batchInfo.schedulingDelay.isDefined && batch.batchInfo.schedulingDelay.get > 0) {
batch.batchInfo.schedulingDelay.get / 1000
}
else {
0
}
// Store the total record count for this batch
val numRecords = batch.batchInfo.numRecords
// Log all three (3) metrics to Influx
influx.write(influxDB, influxMeasurement, Seq(), Seq(("processingTime", processingTime)))
influx.write(influxDB, influxMeasurement, Seq(), Seq(("schedulingDelay", schedulingDelay)))
influx.write(influxDB, influxMeasurement, Seq(), Seq(("numRecords", numRecords)))
}
/**
* A combination method that will handle both influx writes and MySQL offsets.
* This is effectively a convenience method of writeBatchOffsetsToMySQL + writeBatchCountsToInflux.
*
* @param batch Class having information on the completed batch
*/
def writeBatchOffsetsAndCounts(batch: StreamingListenerBatchCompleted): Unit = {
// for each stream topic consumed this batch...
batch.batchInfo.streamIdToInputInfo.foreach(topic => {
// write offsets for this topic to mysql
writeTopicOffsetsToMySQL(topic)
// write record count for this topic this batch
writeTopicCountToInflux(topic)
})
}
/**
* A convenience method for writing offsets to MySQL for each topic being consumed.
*
* @param batch Class having information on the completed batch
*/
def writeBatchOffsetsToMySQL(batch: StreamingListenerBatchCompleted): Unit = {
// for each stream topic consumed this batch...
batch.batchInfo.streamIdToInputInfo.foreach(topic => {
// write offsets to mysql
writeTopicOffsetsToMySQL(topic)
})
}
/**
* A convenience method for writing topic counts to Influx for each topic being consumed.
*
* @param batch Class having information on the completed batch
*/
def writeBatchCountsToInflux(batch: StreamingListenerBatchCompleted): Unit = {
// for each stream topic consumed this batch...
batch.batchInfo.streamIdToInputInfo.foreach(topic => {
// write offsets to mysql
writeTopicCountToInflux(topic)
})
}
// =======================================================================
// Topic Methods (designed for use inside of convenience methods)
// =======================================================================
/**
* Takes a topic object and writes the max offset for each partition it contains this batch to MySQL.
*
* @param topic A topic object within a Batch's StreamIdToInputInfo
*/
def writeTopicOffsetsToMySQL(topic: Tuple2[Int, StreamInputInfo]): Unit = {
// map offset info to OffsetRange objects
val partitionOffsets = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]]
// for every partition's range of offsets
partitionOffsets.map(offsetRange => {
// write the new starting offset for each partition in the topic to the state db
var maxOffset = offsetRange.untilOffset - 1
// create a now() timestamp
val now = new DateTime().toString("YYYY-MM-dd HH:mm:ss")
// form the sql
val sql =
s"""INSERT INTO $mySQLDB.$mySQLTable (consumer, topic, partition_id, offset, offset_ts, batch_size)
VALUES
('$mySQLConsumer', "${offsetRange.topic}", ${offsetRange.partition}, '$maxOffset', '$now', ${offsetRange.count})
ON DUPLICATE KEY UPDATE offset_ts = VALUES(offset_ts), offset = VALUES(offset),
batch_size = VALUES(batch_size)
"""
// execute the sql to offload offsets to the table
val st = mySQLConnectionPool.createStatement
st.execute(sql)
st.close()
})
}
/**
* Takes a topic object and writes the number of records for said topic this batch to Influx.
*
* @param topic A topic object within a Batch's StreamIdToInputInfo
*/
def writeTopicCountToInflux(topic: Tuple2[Int, StreamInputInfo]): Unit = {
// store the individual record count for this topic
val numRecords = topic._2.numRecords
// store topicName
val topicName = topic._2.metadata("offsets").asInstanceOf[List[OffsetRange]].head.topic
// write record count for this topic this batch
influx.write(influxDB, influxMeasurement, Seq(), Seq(("numRecords_" + topicName, numRecords)))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment