Last active
April 9, 2019 06:22
-
-
Save eric-maynard/033997d847ce7ccaf62dac8b634a2c33 to your computer and use it in GitHub Desktop.
Custom Spark AccumulatorV2 example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.util._ | |
def vectorizedAddUnsafe128x3(a: Array[Byte], b: Array[Byte]): Array[Byte] = { | |
var carry: Int = 0 | |
((a.length -1) to 0 by -4).map(i => { | |
val x: Long = | |
java.lang.Integer.toUnsignedLong(((((((a(i - 3)) << 24) | a(i - 2)) << 16) | a(i - 1)) << 8) | a(i)) + | |
java.lang.Integer.toUnsignedLong(((((((b(i - 3)) << 24) | b(i - 2)) << 16) | b(i - 1)) << 8) | b(i)) + | |
carry | |
carry = if (x > 0xFFFFFFFFL && (i - 3) % 16 != 0) 1 else 0 | |
a(i) = (x & 0x000000FFL).toByte | |
a(i - 1) = ((x & 0x0000FF00L) >> 8).toByte | |
a(i - 2) = ((x & 0x00FF0000L) >> 16).toByte | |
a(i - 3) = ((x & 0xFF000000L) >> 24).toByte | |
}) | |
a | |
} | |
// Create the customer accumulator: | |
case class TripleCounter(firstCount: (Long, Long) = (0L, 0L), secondCount: (Long, Long) = (0L, 0L), thirdCount: (Long, Long) = (0L, 0L)) | |
case class DoubleLongTripleAccumulator(counter: TripleCounter = TripleCounter()) extends AccumulatorV2[Array[Byte], TripleCounter] { | |
var firstCountBig = counter.firstCount._1 | |
var firstCountSmall = counter.firstCount._2 | |
var secondCountBig = counter.secondCount._1 | |
var secondCountSmall = counter.secondCount._2 | |
var thirdCountBig = counter.thirdCount._1 | |
var thirdCountSmall = counter.thirdCount._2 | |
override def value: TripleCounter = | |
TripleCounter((firstCountBig, firstCountSmall), (secondCountBig, secondCountSmall), (thirdCountBig, thirdCountSmall)) | |
override def isZero: Boolean = | |
firstCountBig == 0 && firstCountSmall == 0 && | |
secondCountBig == 0 && secondCountSmall == 0 && | |
thirdCountBig == 0 && thirdCountSmall == 0 | |
override def copy(): AccumulatorV2[Array[Byte], TripleCounter] = DoubleLongTripleAccumulator(value) | |
override def reset(): Unit = { | |
firstCountBig = 0 | |
firstCountSmall = 0 | |
secondCountBig = 0 | |
secondCountSmall = 0 | |
thirdCountBig = 0 | |
thirdCountSmall = 0 | |
} | |
override def add(a: Array[Byte]): Unit = { | |
if(a(15) != 0) { | |
firstCountSmall += 1 | |
if(firstCountSmall <= 0) { | |
firstCountBig += 1 | |
firstCountSmall -= Long.MaxValue | |
} | |
} | |
if(a(31) != 0) { | |
secondCountSmall += 1 | |
if(secondCountSmall <= 0) { | |
secondCountBig += 1 | |
secondCountSmall -= Long.MaxValue | |
} | |
} | |
if(a(47) != 0) { | |
thirdCountSmall += 1 | |
if(thirdCountSmall <= 0) { | |
thirdCountBig += 1 | |
thirdCountSmall -= Long.MaxValue | |
} | |
} | |
} | |
override def merge(other: AccumulatorV2[Array[Byte], TripleCounter]): Unit = { | |
firstCountBig += other.value.firstCount._1 | |
val tempFirstCountSmall = firstCountSmall + other.value.firstCount._2 | |
if (tempFirstCountSmall < firstCountSmall || tempFirstCountSmall < other.value.firstCount._2) { | |
firstCountBig += 1 | |
firstCountSmall = tempFirstCountSmall - Long.MaxValue | |
} else { | |
firstCountSmall = tempFirstCountSmall | |
} | |
secondCountBig += other.value.secondCount._1 | |
val tempSecondCountSmall = secondCountSmall + other.value.secondCount._2 | |
if (tempSecondCountSmall < secondCountSmall || tempSecondCountSmall < other.value.secondCount._2) { | |
secondCountBig += 1 | |
secondCountSmall = tempSecondCountSmall - Long.MaxValue | |
} else { | |
secondCountSmall = tempSecondCountSmall | |
} | |
thirdCountBig += other.value.thirdCount._1 | |
val tempThirdCountSmall = thirdCountSmall + other.value.thirdCount._2 | |
if (tempThirdCountSmall < thirdCountSmall || tempThirdCountSmall < other.value.thirdCount._2) { | |
thirdCountBig += 1 | |
thirdCountSmall = tempThirdCountSmall - Long.MaxValue | |
} else { | |
thirdCountSmall = tempThirdCountSmall | |
} | |
} | |
} | |
// Create & register the accumulator: | |
// TODO remove this, we boost the starting value only to demonstrate overflow here: | |
val dltAccumulator = DoubleLongTripleAccumulator(TripleCounter((0, 0), (0, 0), (0, Long.MaxValue - 200))) | |
spark.sparkContext.register(dltAccumulator, "dltAccumulator") | |
// Start filling the accumulator: | |
val zero = Array[Byte]( | |
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, | |
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, | |
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) | |
val one = Array[Byte]( | |
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, | |
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, | |
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1) | |
spark.sparkContext.parallelize(1 to 500).foreach(x => dltAccumulator.add(zero)) | |
spark.sparkContext.parallelize(1 to 500).foreach(x => dltAccumulator.add(one)) | |
// Print the accumulator value: | |
println(dltAccumulator.value) | |
//TripleCounter((0,0),(0,0),(1,300)) | |
// Performance testing: | |
dltAccumulator.reset() | |
val random = new java.util.Random() | |
def getRandomBytes() = if(random.nextBoolean) one.clone else zero.clone | |
// Test the adding method: | |
val methodData = spark.sparkContext.parallelize(1 to 10000000).map(x => getRandomBytes) | |
methodData.cache() | |
methodData.count() | |
val startMethod = System.currentTimeMillis() | |
methodData.reduce(vectorizedAddUnsafe128x3) | |
val endMethod = System.currentTimeMillis() | |
methodData.unpersist(true) | |
// Test the accumulator method: | |
val accumulatorData = spark.sparkContext.parallelize(1 to 10000000).map(x => getRandomBytes) | |
accumulatorData.cache() | |
accumulatorData.count() | |
val startAccumulator = System.currentTimeMillis() | |
accumulatorData.foreach(x => dltAccumulator.add(x)) | |
val endAccumulator = System.currentTimeMillis() | |
accumulatorData.unpersist(true) | |
// Print results: | |
println("Method finished in " + (endMethod - startMethod)) | |
println("Accumulator finished in " + (endAccumulator - startAccumulator)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment