Skip to content

Instantly share code, notes, and snippets.

@ahoy-jon
Created January 2, 2017 00:21
Show Gist options
  • Save ahoy-jon/92fa447331837e52932721f49521d94c to your computer and use it in GitHub Desktop.
Save ahoy-jon/92fa447331837e52932721f49521d94c to your computer and use it in GitHub Desktop.
package io.univalence.sparktools.kpialgebra
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import shapeless.contrib.spire._
import spire.algebra._
import spire.implicits._
import scala.reflect.ClassTag
case class DeltaPart[T: AdditiveMonoid](count: Long,
part: T)
case class DeltaCommon[T: AdditiveMonoid](count: Long,
countZero: Long,
diff: T,
error: T,
left: T,
right: T)
case class Delta[L: AdditiveMonoid,
R: AdditiveMonoid,
C: AdditiveMonoid](left: DeltaPart[L],
right: DeltaPart[R],
common: DeltaCommon[C])
object KpiAlgebra {
def computeCommon[LRC: Rng](left: LRC, right: LRC): DeltaCommon[LRC] = {
val diff = left - right
val error = diff * diff
DeltaCommon(1, if (diff == Monoid.additive[LRC].id) 1 else 0, diff, error, left, right)
}
def monoid[LM: AdditiveMonoid,
RM: AdditiveMonoid,
LRC: AdditiveMonoid] = Monoid.additive[Delta[LM, RM, LRC]]
def compare[K: ClassTag, L: ClassTag, R: ClassTag,
LM: AdditiveMonoid : ClassTag,
RM: AdditiveMonoid : ClassTag,
LRC: Rng : ClassTag](left: RDD[(K, L)],
right: RDD[(K, R)])
(flm: L => LM,
frm: R => RM,
flc: L => LRC,
frc: R => LRC): Delta[LM, RM, LRC] = {
val map: RDD[Delta[LM, RM, LRC]] = left.fullOuterJoin(right).map({
case (_, (Some(l), None)) => monoid[LM, RM, LRC].id.copy(left = DeltaPart(1, flm(l)))
case (_, (None, Some(r))) => monoid[LM, RM, LRC].id.copy(right = DeltaPart(1, frm(r)))
case (_, (Some(l), Some(r))) => monoid[LM, RM, LRC].id.copy(common = computeCommon(flc(l), frc(r)))
})
map.reduce((x, y) => monoid[LM, RM, LRC].op(x, y))
}
}
object KpiAlgebraTest {
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("smoketest"))
val parallelize: RDD[(Int, Int)] = sc.parallelize((1 to 4).zipWithIndex)
println(KpiAlgebra.compare(parallelize, parallelize)(identity, identity, identity, identity))
// Delta(DeltaPart(0,0),DeltaPart(0,0),DeltaCommon(4,4,0,0,6,6))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment