Skip to content

Instantly share code, notes, and snippets.

@dimastatz
Created November 12, 2020 19:40
Show Gist options
  • Save dimastatz/c3729b228865ed52bef279607ad28a14 to your computer and use it in GitHub Desktop.
Save dimastatz/c3729b228865ed52bef279607ad28a14 to your computer and use it in GitHub Desktop.
object SparkListenerClient {
def start(sc: SparkContext, enabled: Boolean): Option[SparkListenerClient] = {
if (enabled) {
val sl = new SparkListenerClient()
sc.addSparkListener(sl)
Some(sl)
} else None
}
def stop(sc: SparkContext, sl: SparkListenerClient): Unit = {
sc.removeSparkListener(sl)
}
def flush(sc: SparkContext, sl: SparkListenerClient, client: MetricsClient, appName: String): Try[Unit] = {
Try {
sl.closeExecutors()
val name = normalizeName(appName)
val time = sl.executors.values.map(v => v.duration * v.cores).sum
client.sendMetric(s"cost_pl_${name._1}", time)
client.sendMetric(s"cost_app_${name._2}", time)
stop(sc, sl)
}
}
// TODO: this classifier will be replaced by attribute
def normalizeName(name: String): (String, String) = {
name.toLowerCase.filterNot(_.isWhitespace) match {
case x if x.contains("vpa") => ("vpa", x)
case x if x.contains("test") => ("test", x)
case x if x.contains("compare") => ("test", x)
case x if x.contains("adproxy") => ("adproxy", x)
case x => ("usage", x)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment