Skip to content

Instantly share code, notes, and snippets.

@eric-maynard
Last active April 9, 2019 06:22
Show Gist options
  • Save eric-maynard/033997d847ce7ccaf62dac8b634a2c33 to your computer and use it in GitHub Desktop.
Save eric-maynard/033997d847ce7ccaf62dac8b634a2c33 to your computer and use it in GitHub Desktop.
Custom Spark AccumulatorV2 example
import org.apache.spark.util._
def vectorizedAddUnsafe128x3(a: Array[Byte], b: Array[Byte]): Array[Byte] = {
var carry: Int = 0
((a.length -1) to 0 by -4).map(i => {
val x: Long =
java.lang.Integer.toUnsignedLong(((((((a(i - 3)) << 24) | a(i - 2)) << 16) | a(i - 1)) << 8) | a(i)) +
java.lang.Integer.toUnsignedLong(((((((b(i - 3)) << 24) | b(i - 2)) << 16) | b(i - 1)) << 8) | b(i)) +
carry
carry = if (x > 0xFFFFFFFFL && (i - 3) % 16 != 0) 1 else 0
a(i) = (x & 0x000000FFL).toByte
a(i - 1) = ((x & 0x0000FF00L) >> 8).toByte
a(i - 2) = ((x & 0x00FF0000L) >> 16).toByte
a(i - 3) = ((x & 0xFF000000L) >> 24).toByte
})
a
}
// Create the customer accumulator:
case class TripleCounter(firstCount: (Long, Long) = (0L, 0L), secondCount: (Long, Long) = (0L, 0L), thirdCount: (Long, Long) = (0L, 0L))
case class DoubleLongTripleAccumulator(counter: TripleCounter = TripleCounter()) extends AccumulatorV2[Array[Byte], TripleCounter] {
var firstCountBig = counter.firstCount._1
var firstCountSmall = counter.firstCount._2
var secondCountBig = counter.secondCount._1
var secondCountSmall = counter.secondCount._2
var thirdCountBig = counter.thirdCount._1
var thirdCountSmall = counter.thirdCount._2
override def value: TripleCounter =
TripleCounter((firstCountBig, firstCountSmall), (secondCountBig, secondCountSmall), (thirdCountBig, thirdCountSmall))
override def isZero: Boolean =
firstCountBig == 0 && firstCountSmall == 0 &&
secondCountBig == 0 && secondCountSmall == 0 &&
thirdCountBig == 0 && thirdCountSmall == 0
override def copy(): AccumulatorV2[Array[Byte], TripleCounter] = DoubleLongTripleAccumulator(value)
override def reset(): Unit = {
firstCountBig = 0
firstCountSmall = 0
secondCountBig = 0
secondCountSmall = 0
thirdCountBig = 0
thirdCountSmall = 0
}
override def add(a: Array[Byte]): Unit = {
if(a(15) != 0) {
firstCountSmall += 1
if(firstCountSmall <= 0) {
firstCountBig += 1
firstCountSmall -= Long.MaxValue
}
}
if(a(31) != 0) {
secondCountSmall += 1
if(secondCountSmall <= 0) {
secondCountBig += 1
secondCountSmall -= Long.MaxValue
}
}
if(a(47) != 0) {
thirdCountSmall += 1
if(thirdCountSmall <= 0) {
thirdCountBig += 1
thirdCountSmall -= Long.MaxValue
}
}
}
override def merge(other: AccumulatorV2[Array[Byte], TripleCounter]): Unit = {
firstCountBig += other.value.firstCount._1
val tempFirstCountSmall = firstCountSmall + other.value.firstCount._2
if (tempFirstCountSmall < firstCountSmall || tempFirstCountSmall < other.value.firstCount._2) {
firstCountBig += 1
firstCountSmall = tempFirstCountSmall - Long.MaxValue
} else {
firstCountSmall = tempFirstCountSmall
}
secondCountBig += other.value.secondCount._1
val tempSecondCountSmall = secondCountSmall + other.value.secondCount._2
if (tempSecondCountSmall < secondCountSmall || tempSecondCountSmall < other.value.secondCount._2) {
secondCountBig += 1
secondCountSmall = tempSecondCountSmall - Long.MaxValue
} else {
secondCountSmall = tempSecondCountSmall
}
thirdCountBig += other.value.thirdCount._1
val tempThirdCountSmall = thirdCountSmall + other.value.thirdCount._2
if (tempThirdCountSmall < thirdCountSmall || tempThirdCountSmall < other.value.thirdCount._2) {
thirdCountBig += 1
thirdCountSmall = tempThirdCountSmall - Long.MaxValue
} else {
thirdCountSmall = tempThirdCountSmall
}
}
}
// Create & register the accumulator:
// TODO remove this, we boost the starting value only to demonstrate overflow here:
val dltAccumulator = DoubleLongTripleAccumulator(TripleCounter((0, 0), (0, 0), (0, Long.MaxValue - 200)))
spark.sparkContext.register(dltAccumulator, "dltAccumulator")
// Start filling the accumulator:
val zero = Array[Byte](
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
val one = Array[Byte](
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1)
spark.sparkContext.parallelize(1 to 500).foreach(x => dltAccumulator.add(zero))
spark.sparkContext.parallelize(1 to 500).foreach(x => dltAccumulator.add(one))
// Print the accumulator value:
println(dltAccumulator.value)
//TripleCounter((0,0),(0,0),(1,300))
// Performance testing:
dltAccumulator.reset()
val random = new java.util.Random()
def getRandomBytes() = if(random.nextBoolean) one.clone else zero.clone
// Test the adding method:
val methodData = spark.sparkContext.parallelize(1 to 10000000).map(x => getRandomBytes)
methodData.cache()
methodData.count()
val startMethod = System.currentTimeMillis()
methodData.reduce(vectorizedAddUnsafe128x3)
val endMethod = System.currentTimeMillis()
methodData.unpersist(true)
// Test the accumulator method:
val accumulatorData = spark.sparkContext.parallelize(1 to 10000000).map(x => getRandomBytes)
accumulatorData.cache()
accumulatorData.count()
val startAccumulator = System.currentTimeMillis()
accumulatorData.foreach(x => dltAccumulator.add(x))
val endAccumulator = System.currentTimeMillis()
accumulatorData.unpersist(true)
// Print results:
println("Method finished in " + (endMethod - startMethod))
println("Accumulator finished in " + (endAccumulator - startAccumulator))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment