Skip to content

Instantly share code, notes, and snippets.

@douglaz
Created March 1, 2018 23:49
Show Gist options
  • Save douglaz/b54efdc59418bd09db718f407712cd5b to your computer and use it in GitHub Desktop.
Save douglaz/b54efdc59418bd09db718f407712cd5b to your computer and use it in GitHub Desktop.
object CollectionUtils {
implicit class PairRDDLikeOps[K, V](iterable: Iterable[(K, V)]) {
def groupByKey(): List[(K, Iterable[V])] = {
iterable
.groupBy { case (k, _) => k }
.mapValues(_.map { case (_, v) => v })
.toList
}
def reduceByKey(fn: (V, V) => V): List[(K, V)] = {
iterable
.groupBy { case (k, _) => k }
.mapValues(_.map { case (_, v) => v }.reduce(fn))
.toList
}
def cogroup[O](other: Iterable[(K, O)]): Iterable[(K, (Iterable[V], Iterable[O]))] = {
val thisGrouped = iterable.groupByKey().toMap
val otherGrouped = other.groupByKey().toMap
(thisGrouped.keySet ++ otherGrouped.keySet).map { k =>
k -> (thisGrouped.getOrElse(k, Nil), otherGrouped.getOrElse(k, Nil))
}
}
def join[O](other: Iterable[(K, O)]): Iterable[(K, (V, O))] = {
cogroup(other).flatMap { case (k, (groupedA, groupedB)) =>
for {
a <- groupedA
b <- groupedB
} yield (k, (a, b))
}
}
def leftOuterJoin[O](other: Iterable[(K, O)]): Iterable[(K, (V, Option[O]))] = {
cogroup(other).flatMap { case (k, (groupedA, groupedB)) =>
for {
a <- groupedA
b <- {
if(groupedB.isEmpty)
List(None)
else
groupedB.map(Option.apply)
}
} yield (k, (a, b))
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment