Last active
August 29, 2015 14:10
-
-
Save johnynek/7f8638440b9f57bae20e to your computer and use it in GitHub Desktop.
Abstracting map/reduce joins.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @avibryant and I have been interested in extracting as much of scalding out into Algebird, | |
* so that it is portable across many execution systems, but how to model joins? | |
* | |
* In the FP world, Applicative[M] is a typeclass that gives you both Functor[M] (which provides map) | |
* and in addition join: | |
*/ | |
trait Functor[M[_]] { | |
// law: map(map(a)(f))(g) == map(a)(f.andThen(g)) | |
def map[V,U](init: M[T])(fn: T => U): M[U] | |
} | |
trait Applicative[M[_]] extends Functor[M] { | |
/* | |
* I think the laws using join can be written as: | |
* map(join(apply(a), m)) { case (_, t) => t } == m | |
* join(join(ma, mb), mc).map { case ((a, b), c) => (a, b, c) } == | |
* join(ma, join(mb, mc)).map { case (a, (b, c)) => (a, b, c) } | |
*/ | |
def apply[A](a: A): M[A] | |
def join[A, B](a: M[A], b: M[B]): M[(A, B)] | |
} | |
/** | |
* setting aside apply, let's look at just the join:' | |
*/ | |
trait Joinable[M[_]] { | |
def join[A, B](a: M[A], b: M[B]): M[(A, B)] | |
} | |
trait MapJoin[K] extends Joinable[[V] => Map[K, V]] { | |
def join[A, B](a: Map[K, A], b: Map[K, B]): Map[K, (A, B)] = | |
(a.keySet & b.keySet).foldLeft(Map.empty[K, (A, B)]) { (m, k) => | |
m + (a(k), b(k)) | |
} | |
} | |
/* | |
* For databases we normally think of a type where each Key can have multiple values | |
* unless some other constraint has been made. This is like a Map[K, Iterable[V]] type: | |
*/ | |
trait TableJoin[K] extends Joinable[[V] => Map[K, Iterable[V]]] { | |
/* | |
* here there are perhaps two ways to join the Iterable: | |
* 1) zip style | |
* 2) cross product. This is usually what is meant in DB join, and what we use here | |
*/ | |
def join[A, B](a: Map[K, Iterable[A]], b: Map[K, Iterable[B]]): Map[K, Iterable[(A, B)]] = | |
(a.keySet & b.keySet).foldLeft(Map.empty[K, Iterable[(A, B)]]) { (m, k) => | |
m + (for { ai <- a(k); bi <- b(k) } yield (ai, bi)) | |
} | |
} | |
/* | |
* At this point you might notice that join on the map is just doing join on something | |
* isomorphic to case class Identity[T](get: T) wrapper or | |
* and [V] => Map[K, Iterable[V]] is doing the same for Iterable[T], so these two implementations | |
* can be rewritten to be the same. | |
* | |
* This is fine, but it didn't get us much closer to the question of how to extract | |
* joins from scalding like we did with Aggregator. | |
* 1) How do we apply the logic independent of the type (in the above Map)? | |
* 2) How do we do joins other than inner joins (like outer)? | |
*/ | |
trait GenJoinable[M[_], C[_, _]] { | |
def join[A, B](ma: M[A], mb: M[B]): M[C[A, B]] | |
} | |
sealed trait TriState[+A, +B] | |
case class TriLeft[A](left: A) extends TriState[A, Nothing] | |
case class TriRight[B](right: B) extends TriState[Nothing, B] | |
case class TriBoth[A, B](left: A, right: B) extends TriState[A, B] | |
trait InnerJoin[M[_]] extends GenJoinable[M[_], Tuple2] | |
trait OuterJoin[M[_]] extends GenJoinable[M[_], TriState] | |
type LeftOrBoth[A, B] = Either[A, (A, B)] | |
trait LeftJoin[M[_]] extends GenJoinable[M[_], LeftOrBoth] | |
/** | |
* Okay, but what does that get us? What are the laws? What are the constraints on the C[_, _] container type? | |
* I don't know yet. | |
* | |
* Some thoughts: | |
* - to be interesting on map/reduce we probably need to have some notion of the ability to go through | |
* an Iterator[(K, V)] => Iterator[(K1, V)] | |
* Iterator[(K, U)] => Iterator[(K1, U)] | |
* in an non-keylocal way, | |
* and then again later in a key-local way: (K1, Iterator[V], Iterator[U]) => Iterator[(K, C[V, U])] | |
* | |
* Does this work: | |
*/ | |
trait JoinAlgo[K, K1, C[_, _]] { | |
def prepareLeft[V](left: Iterator[(K, V)]): Iterator[(K1, V)] | |
def prepareRight[U](right: Iterator[(K, U)]): Iterator[(K1, U)] | |
def joinGroup[V, U](key: K1, lefts: Iterator[V], right: Iterable[U]): Iterator[(K, C[V, U])] | |
} | |
/** | |
* It should be obvious to see that this can implement left, right, inner and outer join | |
* with trivial prepareLeft, prepareRight. | |
* | |
* Let's try block-join that is a simple way of dealing with key skew | |
*/ | |
trait BlockJoin[K] extends JoinAlgo[K, (Int, Int, K), Tuple2] { | |
def prepareLeft[V](left: Iterator[(K, V)]) = | |
left.flatMap { case (k, v) => | |
val row = if (v.hashCode % 2 == 0) 0 else 1 | |
Iterator((row, 0, k), v), (row, 1, k), v)) | |
} | |
def prepareRight[U](left: Iterator[(K, U)]) = | |
left.flatMap { case (k, u) => | |
val col = if (u.hashCode % 2 == 0) 0 else 1 | |
Iterator((0, col, k), u), (1, col, k), u)) | |
} | |
def joinGroup[V, U](subblock: (Int, Int, K), lefts: Iterator[V], rights: Iterable[U]) = | |
// Just do the cross here | |
for { | |
v <- lefts | |
u <- rights.iterator | |
} yield (subblock._2, (v, u)) | |
} | |
/** | |
* Okay, but what about bloomjoin, that requires making an aggregated value from one side and sending it to the other? | |
* I don't know yet. | |
*/ |
One error, which is that the last example should be:
>>> let join4 = liftA4 (,,,)
>>> join4 m1 (optional m2) (optional m3) (optional m4)
@Gabriel439 this is really interesting and I appreciate you expanding on it so much. Also, I like the trick you did (in your follow up blog post) to make a Map that is applicative. I overlooked that All | Some (Set k) trick. Nice.
That said: the main goal we are interested in is not modeling joins conceptually, but implementing things like bloomJoin/sketchJoin/blockJoin etc... in a way that does not depend on scalding and could be reused from many map/reduce-like systems. If we are going to do that, we need an abstraction of what a Join is, so that it can be applied without knowledge of how it works.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I'll propose an alternative solution for maps and joins which has three nice properties:
Map
type implements the fullApplicative
interfaceMap
type also implements theAlternative
interface (which you can think of as a higher-kindedMonoid
interface)Applicative
andAlternative
interfacesMaybe
sFirst, I'll begin with the implementation of
Map
andlookup
:This new
Map
type doesn't necessarily have to store a key-value map at all. It just stores some sort oflookup
function, which could be implemented in terms of a conventional map or you could just pass a raw user-written function. Here are some examples of building and using this new type of map:The category-theory terminology for this trick is a "final encoding" (i.e. you encode a type by blindly translating its specification, which in this case is just the
lookup
function).This encoding lets us implement the full
Applicative
interface:However, there's another interface that this
Map
type can also implement: theAlternative
type class (which is also provided byControl.Applicative
). The definition of the interface is:It's basically identical to the
Monoid
type class, except:f
.Applicative
constraint forf
We can implement
Alternative
for thisMap
type, too:Now we can implement all the
join
operations in terms ofApplicative
andAlternative
:Notice how I'm using the
optional
function, which I didn't even define. That function is provided byControl.Applicative
already!None of those operations are
Map
-specific. They would work on any type that implementsAlternative
, likeMaybe
, and they will "do the right thing":As a bonus, it's easy to do multiple left-joins using this
Alternative
API. If you wanted to left join in 3 sources, you would just write:This
join
+optional
API cleanly avoids the issue the old API had where you would get nestedMaybe
s when you left join in multiple sources.So I propose that
Alternative
is the clean solution to implementing derived join operations, and it already come with obvious laws: theMonoid
laws:Finally, note that I'm not necessarily proposing that you need to implement the
Map
type the same way that I did (as a function). However, I do believe that whatever the final implementation is that it should implement bothApplicative
andAlternative
. TheAlternative
/optional
trick for implementing joins is the meat of my proposal and the encoding-a-Map
-as-a-function trick is just a stepping stone for explaining theAlternative
idea.Also, some minor nitpicks. There's one more functor law:
... and one more applicative law:
I need more time to think about
TableJoin
. I'm still not sure what is the most elegant API for that just yet.