Skip to content

Instantly share code, notes, and snippets.

@girisandeep
Last active September 15, 2019 10:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save girisandeep/450ff3d29f20f2e31cdd09ad0f1c0df2 to your computer and use it in GitHub Desktop.
Save girisandeep/450ff3d29f20f2e31cdd09ad0f1c0df2 to your computer and use it in GitHub Desktop.
Create custom accumulator using Scala for Spark 1.x
class MyComplex(var x: Int, var y: Int) extends Serializable{
def reset(): Unit = {
x = 0
y = 0
}
def add(p:MyComplex): MyComplex = {
x = x + p.x
y = y + p.y
return this
}
}
import org.apache.spark.AccumulatorParam
class ComplexAccumulatorV1 extends AccumulatorParam[MyComplex] {
def zero(initialVal: MyComplex): MyComplex = {
return initialVal
}
def addInPlace(v1: MyComplex, v2: MyComplex): MyComplex = {
v1.add(v2)
return v1;
}
}
val vecAccum = sc.accumulator(new MyComplex(0,0))(new ComplexAccumulatorV1)
var myrdd = sc.parallelize(Array(1,2,3))
def myfunc(x:Int):Int = {
vecAccum += new MyComplex(x, x)
return x * 3
}
var myrdd1 = myrdd.map(myfunc)
myrdd1.collect()
vecAccum.value.x
vecAccum.value.y
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment