Skip to content

Instantly share code, notes, and snippets.

@arahuja
Last active August 29, 2015 14:08
Show Gist options
  • Save arahuja/89625a5841ab5a44d35c to your computer and use it in GitHub Desktop.
Save arahuja/89625a5841ab5a44d35c to your computer and use it in GitHub Desktop.
case taskNumberRegionPairs1 :: taskNumberRegionPairs2 :: Nil => {
val rdd1Records = sc.accumulator(0L, s"rdd1.records")
val rdd2Records = sc.accumulator(0L, s"rdd2.records")
val lociAccum = sc.accumulator(0L, s"rdd.task.loci")
// Cogroup-based implementation.
val paritioned = taskNumberRegionPairs1.cogroup(taskNumberRegionPairs2, new PartitionByKey(numTasks.toInt))
val sorted = new ShuffledRDD[TaskPosition, (Iterable[M], Iterable[M]), (Iterable[M], Iterable[M])](
paritioned,
new PartitionByKey(numTasks.toInt))
.setKeyOrdering(implicitly[Ordering[TaskPosition]])
sorted.mapPartitionsWithIndex((taskNum: Int, taskNumAndRegionPairs) => {
if (taskNumAndRegionPairs.isEmpty) {
Iterator.empty
} else {
val taskLoci = lociPartitionsBoxed.value.asInverseMap(taskNum.toLong)
rdd1Records += taskNumAndRegionPairs.flatMap(_._2._1).length
rdd2Records += taskNumAndRegionPairs.flatMap(_._2._2).length
lociAccum += taskLoci.count
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment