Skip to content

Instantly share code, notes, and snippets.

@hkxIron
Forked from fedragon/HashMapParam.scala
Created September 21, 2017 11:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hkxIron/b517a4e83ce90d10ce6f8b7d14a579c1 to your computer and use it in GitHub Desktop.
Save hkxIron/b517a4e83ce90d10ce6f8b7d14a579c1 to your computer and use it in GitHub Desktop.
Allows HashMap[String, Int] to be used as accumulators in Spark
import org.apache.spark.{AccumulableParam, SparkConf}
import org.apache.spark.serializer.JavaSerializer
import scala.collection.mutable.{ HashMap => MutableHashMap }
/*
* Allows a mutable HashMap[String, Int] to be used as an accumulator in Spark.
* Whenever we try to put (k, v2) into an accumulator that already contains (k, v1), the result
* will be a HashMap containing (k, v1 + v2).
*
* Would have been nice to extend GrowableAccumulableParam instead of redefining everything, but it's
* private to the spark package.
*/
class HashMapParam extends AccumulableParam[MutableHashMap[String, Int], (String, Int)] {
def addAccumulator(acc: MutableHashMap[String, Int], elem: (String, Int)): MutableHashMap[String, Int] = {
val (k1, v1) = elem
acc += acc.find(_._1 == k1).map {
case (k2, v2) => k2 -> (v1 + v2)
}.getOrElse(elem)
acc
}
/*
* This method is allowed to modify and return the first value for efficiency.
*
* @see org.apache.spark.GrowableAccumulableParam.addInPlace(r1: R, r2: R): R
*/
def addInPlace(acc1: MutableHashMap[String, Int], acc2: MutableHashMap[String, Int]): MutableHashMap[String, Int] = {
acc2.foreach(elem => addAccumulator(acc1, elem))
acc1
}
/*
* @see org.apache.spark.GrowableAccumulableParam.zero(initialValue: R): R
*/
def zero(initialValue: MutableHashMap[String, Int]): MutableHashMap[String, Int] = {
val ser = new JavaSerializer(new SparkConf(false)).newInstance()
val copy = ser.deserialize[MutableHashMap[String, Int]](ser.serialize(initialValue))
copy.clear()
copy
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment