Skip to content

Instantly share code, notes, and snippets.

@syedhassaanahmed
Last active September 27, 2020 19:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save syedhassaanahmed/a3e97753192d4a949ec59ea628857710 to your computer and use it in GitHub Desktop.
Save syedhassaanahmed/a3e97753192d4a949ec59ea628857710 to your computer and use it in GitHub Desktop.
Asynchronously monitor Spark structured streaming queries with Azure Application Insights
import scala.collection.JavaConversions._
import java.util.HashMap
import com.microsoft.applicationinsights._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener._
class AppInsightsListener(instrumentationKey: String, eventName: String) extends StreamingQueryListener {
val telemetry = new TelemetryClient
telemetry.getContext.setInstrumentationKey(instrumentationKey)
def onQueryProgress(event: QueryProgressEvent): Unit = {
val properties = new HashMap[String, String]
properties.put("id", event.progress.id.toString)
properties.put("runId", event.progress.runId.toString)
properties.put("timestamp", event.progress.timestamp)
properties.put("batchId", event.progress.batchId.toString)
properties.put("source", event.progress.sources(0).description)
properties.put("sink", event.progress.sink.description)
val metrics = new HashMap[String, java.lang.Double]
metrics.put("inputRowsPerSecond", event.progress.inputRowsPerSecond)
metrics.put("numInputRows", event.progress.numInputRows)
metrics.put("processedRowsPerSecond", event.progress.processedRowsPerSecond)
event.progress.durationMs.foreach(kv => metrics.put(kv._1, kv._2.doubleValue))
telemetry.trackEvent(eventName, properties, metrics)
telemetry.flush
}
def onQueryStarted(event: QueryStartedEvent): Unit = {}
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
spark.streams.addListener(new AppInsightsListener("<APPLICATION_INSIGHTS_INSTRUMENTATION_KEY>", "EventHubs_To_PBI"))
@syedhassaanahmed
Copy link
Author

syedhassaanahmed commented Sep 26, 2020

Don't forget to add dependency to com.microsoft.azure:applicationinsights-core:2.6.2 in your Spark jar/notebook

The following example KQL query can be used to render the Spark structured streaming metrics

customEvents
| where name == "EventHubs_To_PBI" and
    customDimensions.source startswith "org.apache.spark.sql.eventhubs.EventHubsSource@" and
    customDimensions.sink == "ForeachBatchSink"
| project 
    timestamp = todatetime(customDimensions.timestamp), 
    inputRows = todouble(customMeasurements.inputRowsPerSecond), 
    processedRows = todouble(customMeasurements.processedRowsPerSecond)
| render timechart

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment