Skip to content

Instantly share code, notes, and snippets.

@tzachz
Created November 4, 2015 21:06
Show Gist options
  • Save tzachz/0b0a0e6ea3bfddb36557 to your computer and use it in GitHub Desktop.
Save tzachz/0b0a0e6ea3bfddb36557 to your computer and use it in GitHub Desktop.
Creating a Metrics Counter backed by a Spark Accumulator
package com.kenshoo.kripke.core
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.{MetricName, Counter}
import org.apache.spark.Accumulator
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
object CounterBackedAccumulatorUtil {
/** *
* Call returned callback after acting on returned RDD to get counter updated
*/
def countSilently[V: ClassTag](rdd: RDD[V], metricName: String, clazz: Class[_]): (RDD[V], Unit => Unit) = {
val counter: Counter = Metrics.newCounter(new MetricName(clazz, metricName))
val accumulator: Accumulator[Long] = rdd.sparkContext.accumulator(0)
val countedRdd = rdd.map(v => { accumulator += 1; v })
val callback: Unit => Unit = u => counter.inc(accumulator.value)
(countedRdd, callback)
}
}
class MyClass {
import CounterBackedAccumulatorUtil._
def someTransformations(input: RDD[String]): RDD[Long] = ??? // some logic...
def doSomeProcessing(rdd: RDD[String]): Unit = {
// count input:
val (countedInput, callback1) = countSilently(rdd, "inputRecords", classOf[MyClass])
// do some work:
val processed: RDD[Long] = someTransformations(countedInput)
// count output:
val (countedOutput, callback2) = countSilently(processed, "outputRecords", classOf[MyClass])
// some action on result (in this case - save)
countedOutput.saveAsTextFile("/output/file")
// callbacks must be called AFTER the RDD was acted on - that's when the accumulators get updated
callback1.apply()
callback2.apply()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment