Skip to content

Instantly share code, notes, and snippets.

@ibuenros
Created June 29, 2014 17:12
Show Gist options
  • Save ibuenros/9b94736c2bad2f4b8e23 to your computer and use it in GitHub Desktop.
Save ibuenros/9b94736c2bad2f4b8e23 to your computer and use it in GitHub Desktop.
Spark productionizing utilities developed by Ooyala, shown in Spark Summit 2014
//==================================================================
// SPARK INSTRUMENTATION
//==================================================================
import com.codahale.metrics.{MetricRegistry, Meter, Gauge}
import org.apache.spark.{SparkEnv, Accumulator}
import org.apache.spark.metrics.source.Source
import org.joda.time.DateTime
import scala.collection.mutable
/** Instrumentation for Spark based on accumulators.
*
* Usage:
* val instrumentation = new SparkInstrumentation("example.metrics")
* val numReqs = sc.accumulator(0L)
* instrumentation.source.registerDailyAccumulator(numReqs, "numReqs")
* instrumentation.register()
*
* Will create and report the following metrics:
* - Gauge with total number of requests (daily)
* - Meter with rate of requests
*
* @param prefix prefix for all metrics that will be reported by this Instrumentation
*/
class SparkInstrumentation(prefix: String) extends Serializable {
val accumulators = mutable.Set[Accumulator[Long]]()
private class InstrumentationSource(prefix: String) extends Source {
val metricRegistry = new MetricRegistry
val sourceName = prefix
val oldgauges = mutable.Map[String,Long]()
val oldtimes = mutable.Map[String, DateTime]()
val meters = mutable.Map[String,Meter]()
/** Computes metrics based on accumulator. Gauge never resets.
*
* @param a Metrics will be derived from this accumulator
* @param name Name of the metrics
*/
def registerAccumulator(a: Accumulator[Long], name: String){
oldgauges += (name -> 0L)
meters += (name -> metricRegistry.meter(name + "-rate"))
metricRegistry.register(MetricRegistry.name(name),
new Gauge[Long] {
override def getValue: Long = {
meters(name).mark(a.value - oldgauges(name))
oldgauges(name) = a.value
return a.value
}
})
}
/** Computes metrics based on accumulator. Gauge resets at the end of the day.
*
* @param a Metrics will be derived from this accumulator
* @param name Name of the metrics
*/
def registerDailyAccumulator(a: Accumulator[Long], name: String){
oldgauges += (name -> 0L)
meters += (name -> metricRegistry.meter(name + "-rate"))
oldtimes += (name -> DateTime.now)
metricRegistry.register(MetricRegistry.name(name),
new Gauge[Long] {
override def getValue: Long = {
meters(name).mark(a.value - oldgauges(name))
val now = DateTime.now
if (now.getDayOfMonth != oldtimes(name).getDayOfMonth){
a.setValue(0L)
}
oldtimes(name) = now
oldgauges(name) = a.value
return a.value
}
})
}
}
val source = new InstrumentationSource(prefix)
/** Register the Instrumentation with Spark so the metrics are reported to any provided Sink. */
def register(){
SparkEnv.get.metricsSystem.registerSource(source)
}
}
//============================================
// STREAMING LAUNCHER / SERVER
//============================================
import scalax.io.Resource
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.{Logging, SparkConf}
import org.apache.hadoop.yarn.api.records.{ApplicationReport, YarnApplicationState, ApplicationId}
import org.apache.hadoop.yarn.client.ClientRMProxy
import org.apache.hadoop.yarn.api.ApplicationClientProtocol
import org.apache.hadoop.yarn.api.protocolrecords.{GetApplicationsResponse, GetApplicationsRequest}
import org.eclipse.jetty.server.{Request, Handler, Server}
import org.eclipse.jetty.server.handler.{HandlerList, ContextHandler, AbstractHandler}
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import com.lambdaworks.jacks.JacksMapper
import scala.annotation.tailrec
import org.eclipse.jetty.util.thread.QueuedThreadPool
import scala.util.{Failure, Success, Try}
/** Local launcher client for streaming applications in YARN.
*
* Extends usual Spark YARN client for streaming applications. This class should not be called by the user,
* instead, the StreamingClient object is the entry point for launching applications.
*
* @param args User supplied arguments
* @param sparkConf Spark Configuration
*/
class StreamingClient(args: ClientArguments, sparkConf: SparkConf) extends Client(args, sparkConf) {
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
var appIdOption: Option[ApplicationId] = None
val clientHttp = new ClientHttp(this)
val launcherArgs = args
/** Connects to or launches application in YARN cluster.
*
* 1. Search for existing application by the same name.
* 2. If found, monitor existing application.
* 3. If not found, launch new application with this name, and monitor.
*/
override def run() {
sparkConf.set("spark.yarn.report.interval", "10000")
clientHttp.bind()
println("Using yarn at " + yarnConf.getRaw("fs.defaultFS"))
val pidFile = System.getenv("SPARK_LAUNCHER_PID_DIR") match {
case "" => None
case x => Some(Resource.fromFile("%s/%s.appInfo".format(x,args.appName)))
}
val instances = getSparkApplications(setAsJavaSet(Set("SPARK")),java.util.EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.ACCEPTED, YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED)).flatMap { report =>
if (report.getName == args.appName){
Some(report.getApplicationId)
} else {
None
}
}.toList
if (instances.size > 0){
println("Application already running. Monitoring old application.")
init(yarnConf)
start()
appIdOption = Some(instances.head)
monitorApplication(appIdOption.get)
System.exit(0)
} else{
//super.stop()
println("Application not found.")
appIdOption = Some(runApp())
if (!appIdOption.isDefined){
println("Application didn't start correctly")
System.exit(1)
}
if (pidFile.isDefined) {
pidFile.get.write("%s %s".format(appIdOption.get.toString, args.userArgs.mkString(" ")))
System.exit(0)
}
monitorApplication(appIdOption.get)
System.exit(0)
}
}
/** Gets list of Spark applications in YARN cluster */
def getSparkApplications(applicationTypes: java.util.Set[String], applicationStates: java.util.EnumSet[YarnApplicationState]): java.util.List[ApplicationReport] = {
setConfig(yarnConf)
val rmClient = ClientRMProxy.createRMProxy(getConfig, classOf[ApplicationClientProtocol])
val request: GetApplicationsRequest = GetApplicationsRequest.newInstance(applicationTypes, applicationStates)
val response: GetApplicationsResponse = rmClient.getApplications(request)
return response.getApplicationList
}
}
/** Local launcher client for streaming applications in YARN.
*
* Usage:
* java -cp /etc/hadoop/conf:AppJar.jar:spark-assembly.jar org.apache.spark.yarn.StreamingClient --jar AppJar.jar
* --addJars /jars/config.jar --class ooyala.app.MainClass --arg arg1 --arg arg2 --name MyApp
*
*/
object StreamingClient {
def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
// Note: anything env variable with SPARK_ prefix gets propagated to all (remote) processes -
// see Client#setupLaunchEnv().
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf()
val args = new ClientArguments(argStrings, sparkConf)
new StreamingClient(args, sparkConf).run()
}
}
/** Starts an HTTP server for the launcher client.
*
* Allows to check health of application launcher by querying /healthz route.
*
* @param launcher Server will track status of this launcher client.
*/
class ClientHttp(val launcher: StreamingClient) extends Logging {
val port = 8081
var boundPort: Option[Int] = None
var server: Option[Server] = None
val handlers = Seq[(String, Handler)](
("/healthz", healthHandler)
)
/** /healthz route handler
*
* Reports health of the launcher and publishes information of the application
*/
def healthHandler: Handler = {
new AbstractHandler {
override def handle(target: String, baseRequest: Request, request: HttpServletRequest, response: HttpServletResponse): Unit = {
response.setContentType("application/json")
if (!launcher.appIdOption.isDefined) {
response.setStatus(HttpServletResponse.SC_NOT_FOUND)
val res = Map(
"LauncherStatus" -> "Application Not Found"
)
baseRequest.setHandled(true)
response.getWriter.println(JacksMapper.writeValueAsString(res))
return
}
val report = launcher.getApplicationReport(launcher.appIdOption.get)
response.setStatus(HttpServletResponse.SC_OK)
baseRequest.setHandled(true)
val res = Map(
"LauncherStatus" -> "Online",
"YarnCluster" -> launcher.yarnConf.getRaw("fs.defaultFS"),
"ApplicationId" -> launcher.appIdOption.get.toString,
"ApplicationStatus" -> report.getYarnApplicationState,
"StartedAt" -> report.getStartTime.toString,
"TrackingURL" -> report.getTrackingUrl.toString,
"ApplicationName" -> launcher.launcherArgs.appName
)
response.getWriter.println(JacksMapper.writeValueAsString(res))
}
}
}
/**
* Attempts to start a Jetty server at the supplied ip:port which uses the supplied handlers.
*
* If the desired port number is contented, continues incrementing ports until a free port is
* found. Returns the chosen port and the jetty Server object.
*/
def startJettyServer(ip: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = {
val handlersToRegister = handlers.map { case (path, handler) =>
val contextHandler = new ContextHandler(path)
contextHandler.setAllowNullPathInfo(true)
contextHandler.setHandler(handler)
contextHandler
}
val handlerList = new HandlerList
handlerList.setHandlers(handlersToRegister.toArray)
@tailrec
def connect(currentPort: Int): (Server, Int) = {
val server = new Server(currentPort)
val pool = new QueuedThreadPool
pool.setDaemon(true)
server.setThreadPool(pool)
server.setHandler(handlerList)
Try {
server.start()
} match {
case s: Success[_] =>
(server, server.getConnectors.head.getLocalPort)
case f: Failure[_] =>
server.stop()
logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
logInfo("Error was: " + f.toString)
connect((currentPort + 1) % 65536)
}
}
connect(port)
}
def bind() {
try {
val (srv, usedPort) = startJettyServer("0.0.0.0", port, handlers)
logInfo("Started Streaming Launcher UI at port %d".format(usedPort))
server = Some(srv)
boundPort = Some(usedPort)
} catch {
case e: Exception =>
logError("Failed to create Spark JettyUtils", e)
System.exit(1)
}
}
}
//===============================================
// DATADOG SINK
//===============================================
import com.codahale.metrics.MetricRegistry
// Requires com.clipperz.metrics-datadog artifact (not in Maven)
// Compile from https://github.com/clipperz/metrics-datadog
import com.codahale.metrics.reporting.{DatadogReporter, HttpTransport}
import java.util.concurrent.TimeUnit
import java.util.Properties
import org.apache.spark.metrics.sink.Sink
/** Sink to report metrics to Datadog */
class DatadogSink(val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) extends Sink {
val DD_KEY_PERIOD = "period"
val DD_DEFAULT_PERIOD = 10L
val DD_KEY_UNIT = "unit"
val DD_DEFAULT_UNIT = TimeUnit.SECONDS
val DD_API_KEY = "apikey"
val DD_KEY_HOST = "host"
val DD_DEFAULT_HOST = ""
def propertyToOption(prop: String) = Option(property.getProperty(prop))
if (!propertyToOption(DD_API_KEY).isDefined) {
throw new Exception("Datadog sink requires 'apikey' property.")
}
val pollPeriod = propertyToOption(DD_KEY_PERIOD).map(_.toLong)
.getOrElse(DD_DEFAULT_PERIOD)
val pollUnit = propertyToOption(DD_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase))
.getOrElse(DD_DEFAULT_UNIT)
val host = propertyToOption(DD_KEY_HOST).getOrElse(DD_DEFAULT_HOST)
val apikey = propertyToOption(DD_API_KEY).get
val transport = new HttpTransport("app.datadoghq.com",apikey)
val reporter = DatadogReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build(transport,host)
override def start {
reporter.start(pollPeriod, pollUnit)
}
override def stop {
reporter.stop()
}
}
@KenpachiRules
Copy link

KenpachiRules commented May 8, 2018

I tried mimicking the custom metrics class and registered it but sadly it does not get written to Graphite sink
. This is structured streaming spark hence I am not sure whether there is a limitation on that , but I believe it shouldnt be a limitation . Attaching code snippets.

package com.hari.spark.kafka.struct.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.Dataset
import org.apache.spark.Accumulator
import com.codahale.metrics._
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.types.StructType

object ReadWriteKafka {

def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.appName("TestGrafana").getOrCreate

import sparkSession.implicits._
val rowsPassed = sparkSession.sparkContext.accumulator[Long](0, "rowsPassed")
CustomRowsProcessedMetrics.regiserCustomMetrics("rowsPassed", rowsPassed)
val readFromKafka = sparkSession.readStream.format("kafka").option("kafka.bootstrap.servers", "10.65.137.104:9092").option("subscribe", "GrafanaSource").
  option("enable.auto.commit", "false").option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  .option("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer").option("failOnDataLoss", "false").option("auto.offset.reset", "latest").load
// perform same transformation
//implicit val employeeEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
val keyValueTuples = readFromKafka.selectExpr("CAST(key as STRING)", "CAST(value as STRING)").as[(String, String)]
val rowProcessedStore = countRowsProcessed(rowsPassed,keyValueTuples.filter(tup => tup._2.contains("ichigo")).map(tup => (tup._1, tup._2.toUpperCase)))
rowProcessedStore.printSchema
val writeToKafka = rowProcessedStore.writeStream.format("kafka").option("kafka.bootstrap.servers", "10.65.137.104:9092").
  option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer").option("checkpointLocation", "/Hari/hdfs/staging1/").
  option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer").option("topic", "GrafanaTarget").start
writeToKafka.awaitTermination

}

def countRowsProcessed(acc: Accumulator[Long], ds: Dataset[(String, String)]): Dataset[(String, String)] = {
implicit val employeeEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
ds.map {
x =>
acc += 1
x
}.toDF("key", "value").as[(String, String)]
}

}


import com.codahale.metrics.{ MetricRegistry, Meter, Gauge }
import org.apache.spark.Accumulator
import scala.collection.mutable.Map
import org.apache.spark.SparkEnv
import org.apache.spark.metrics.source.CustomMetrics

package com.hari.spark.kafka.struct.streaming {
object CustomRowsProcessedMetrics extends Serializable {

def regiserCustomMetrics(metricsName: String, acc: Accumulator[Long]) {
  import org.apache.spark.metrics.source.CustomMetrics
  val customMets = new CustomMetrics(metricsName, Map[String, Long](), Map[String, Meter]())
  customMets.regGaugeAndMeter(acc, metricsName)
  SparkEnv.get.metricsSystem.registerSource(customMets)
  val test = SparkEnv.get.metricsSystem.getSourcesByName(metricsName)
  println(test)
}

}
}

package org.apache.spark.metrics.source {

class CustomMetrics(metricsName: String, gauge1: Map[String, Long], metrics: Map[String, Meter]) extends Source {

def metricRegistry = new MetricRegistry()
def sourceName = metricsName
// update the metrics with time series

def regGaugeAndMeter(acc: Accumulator[Long], metricsName: String): Unit = {
  import com.codahale.metrics.{ MetricRegistry, Meter, Gauge }
  gauge1 += (metricsName -> 0L)
  metrics += (metricsName -> metricRegistry.meter(metricsName+"-rate"))
  metricRegistry.register(
    MetricRegistry.name(metricsName),
    new Gauge[Long] {
      override def getValue: Long = {
        metrics(metricsName).mark(acc.value - gauge1(metricsName))
        gauge1(metricsName) = acc.value
        println("The incremented values are  --->  " +acc.value )
        return acc.value
      }
    })
}

}

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment