Skip to content

Instantly share code, notes, and snippets.

@johnynek
Last active August 29, 2015 14:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johnynek/7f8638440b9f57bae20e to your computer and use it in GitHub Desktop.
Save johnynek/7f8638440b9f57bae20e to your computer and use it in GitHub Desktop.
Abstracting map/reduce joins.
/**
* @avibryant and I have been interested in extracting as much of scalding out into Algebird,
* so that it is portable across many execution systems, but how to model joins?
*
* In the FP world, Applicative[M] is a typeclass that gives you both Functor[M] (which provides map)
* and in addition join:
*/
trait Functor[M[_]] {
// law: map(map(a)(f))(g) == map(a)(f.andThen(g))
def map[V,U](init: M[T])(fn: T => U): M[U]
}
trait Applicative[M[_]] extends Functor[M] {
/*
* I think the laws using join can be written as:
* map(join(apply(a), m)) { case (_, t) => t } == m
* join(join(ma, mb), mc).map { case ((a, b), c) => (a, b, c) } ==
* join(ma, join(mb, mc)).map { case (a, (b, c)) => (a, b, c) }
*/
def apply[A](a: A): M[A]
def join[A, B](a: M[A], b: M[B]): M[(A, B)]
}
/**
* setting aside apply, let's look at just the join:'
*/
trait Joinable[M[_]] {
def join[A, B](a: M[A], b: M[B]): M[(A, B)]
}
trait MapJoin[K] extends Joinable[[V] => Map[K, V]] {
def join[A, B](a: Map[K, A], b: Map[K, B]): Map[K, (A, B)] =
(a.keySet & b.keySet).foldLeft(Map.empty[K, (A, B)]) { (m, k) =>
m + (a(k), b(k))
}
}
/*
* For databases we normally think of a type where each Key can have multiple values
* unless some other constraint has been made. This is like a Map[K, Iterable[V]] type:
*/
trait TableJoin[K] extends Joinable[[V] => Map[K, Iterable[V]]] {
/*
* here there are perhaps two ways to join the Iterable:
* 1) zip style
* 2) cross product. This is usually what is meant in DB join, and what we use here
*/
def join[A, B](a: Map[K, Iterable[A]], b: Map[K, Iterable[B]]): Map[K, Iterable[(A, B)]] =
(a.keySet & b.keySet).foldLeft(Map.empty[K, Iterable[(A, B)]]) { (m, k) =>
m + (for { ai <- a(k); bi <- b(k) } yield (ai, bi))
}
}
/*
* At this point you might notice that join on the map is just doing join on something
* isomorphic to case class Identity[T](get: T) wrapper or
* and [V] => Map[K, Iterable[V]] is doing the same for Iterable[T], so these two implementations
* can be rewritten to be the same.
*
* This is fine, but it didn't get us much closer to the question of how to extract
* joins from scalding like we did with Aggregator.
* 1) How do we apply the logic independent of the type (in the above Map)?
* 2) How do we do joins other than inner joins (like outer)?
*/
trait GenJoinable[M[_], C[_, _]] {
def join[A, B](ma: M[A], mb: M[B]): M[C[A, B]]
}
sealed trait TriState[+A, +B]
case class TriLeft[A](left: A) extends TriState[A, Nothing]
case class TriRight[B](right: B) extends TriState[Nothing, B]
case class TriBoth[A, B](left: A, right: B) extends TriState[A, B]
trait InnerJoin[M[_]] extends GenJoinable[M[_], Tuple2]
trait OuterJoin[M[_]] extends GenJoinable[M[_], TriState]
type LeftOrBoth[A, B] = Either[A, (A, B)]
trait LeftJoin[M[_]] extends GenJoinable[M[_], LeftOrBoth]
/**
* Okay, but what does that get us? What are the laws? What are the constraints on the C[_, _] container type?
* I don't know yet.
*
* Some thoughts:
* - to be interesting on map/reduce we probably need to have some notion of the ability to go through
* an Iterator[(K, V)] => Iterator[(K1, V)]
* Iterator[(K, U)] => Iterator[(K1, U)]
* in an non-keylocal way,
* and then again later in a key-local way: (K1, Iterator[V], Iterator[U]) => Iterator[(K, C[V, U])]
*
* Does this work:
*/
trait JoinAlgo[K, K1, C[_, _]] {
def prepareLeft[V](left: Iterator[(K, V)]): Iterator[(K1, V)]
def prepareRight[U](right: Iterator[(K, U)]): Iterator[(K1, U)]
def joinGroup[V, U](key: K1, lefts: Iterator[V], right: Iterable[U]): Iterator[(K, C[V, U])]
}
/**
* It should be obvious to see that this can implement left, right, inner and outer join
* with trivial prepareLeft, prepareRight.
*
* Let's try block-join that is a simple way of dealing with key skew
*/
trait BlockJoin[K] extends JoinAlgo[K, (Int, Int, K), Tuple2] {
def prepareLeft[V](left: Iterator[(K, V)]) =
left.flatMap { case (k, v) =>
val row = if (v.hashCode % 2 == 0) 0 else 1
Iterator((row, 0, k), v), (row, 1, k), v))
}
def prepareRight[U](left: Iterator[(K, U)]) =
left.flatMap { case (k, u) =>
val col = if (u.hashCode % 2 == 0) 0 else 1
Iterator((0, col, k), u), (1, col, k), u))
}
def joinGroup[V, U](subblock: (Int, Int, K), lefts: Iterator[V], rights: Iterable[U]) =
// Just do the cross here
for {
v <- lefts
u <- rights.iterator
} yield (subblock._2, (v, u))
}
/**
* Okay, but what about bloomjoin, that requires making an aggregated value from one side and sending it to the other?
* I don't know yet.
*/
@Gabriella439
Copy link

I'll propose an alternative solution for maps and joins which has three nice properties:

  • The Map type implements the full Applicative interface
  • The Map type also implements the Alternative interface (which you can think of as a higher-kinded Monoid interface)
  • You can implement all the diverse join operations solely in terms of operations from the Applicative and Alternative interfaces
  • The API generalizes cleanly to joining in multiple sources without nested Maybes

First, I'll begin with the implementation of Map and lookup:

import qualified Data.Map.Strict as Map
import Prelude hiding (lookup)

newtype Map k v = Map { lookup :: k -> Maybe v }

fromMap :: Ord k => Map.Map k v -> Map k v
fromMap m = Map (\k -> Map.lookup k m)

fromFunction :: (k -> Maybe v) -> Map k v
fromFunction = Map

This new Map type doesn't necessarily have to store a key-value map at all. It just stores some sort of lookup function, which could be implemented in terms of a conventional map or you could just pass a raw user-written function. Here are some examples of building and using this new type of map:

>>> let map1 = fromMap (Map.fromList [(1, "ASDF"), (2, "QWER")]) :: Map Int String
>>> let map2 = fromFunction (Just . show) :: Map Int String
>>> lookup map1 1
Just "ASDF"
>>> lookup map1 3
Nothing
>>> lookup map2 3  -- map2 is defined for all keys
Just "3"

The category-theory terminology for this trick is a "final encoding" (i.e. you encode a type by blindly translating its specification, which in this case is just the lookup function).

This encoding lets us implement the full Applicative interface:

import Control.Applicative

instance Functor (Map k) where
    fmap k (Map f) = Map (fmap (fmap k) f)

instance Applicative (Map k) where
    pure v = Map (pure (pure v))

    Map f1 <*> Map f2 = Map (liftA2 (<*>) f1 f2)

-- I really wish Haskell had not used `join` for `Monad`s
join :: Applicative f => f a -> f b -> f (a, b)
join = liftA2 (,)

However, there's another interface that this Map type can also implement: the Alternative type class (which is also provided by Control.Applicative). The definition of the interface is:

-- Defined in `Control.Applicative`
class Applicative f => Alternative f where
    -- | The identity of '<|>'
    empty :: f a
    -- | An associative binary operation
    (<|>) :: f a -> f a -> f a

It's basically identical to the Monoid type class, except:

  • it's parametrized over a higher-kinded type, f.
  • it also has an Applicative constraint for f

We can implement Alternative for this Map type, too:

instance Alternative (Map k) where
    empty = Map (pure empty)  -- Map (\_ -> Nothing)
    Map f <|> Map g = Map (liftA2 (<|>) f g)  -- Map (\k -> f k `orElse` g k)

Now we can implement all the join operations in terms of Applicative and Alternative:

leftJoin :: Alternative f => f a -> f b -> f (a, Maybe b)
leftJoin m1 m2 = join m1 (optional m2)

rightJoin :: Alternative f => f a -> f b -> f (Maybe a, b)
rightJoin m1 m2 = join (optional m1) m2

outerJoin :: Alternative f => f a -> f b -> f (Maybe a, Maybe b)
outerJoin m1 m2 = join (optional m1) (optional m2)

Notice how I'm using the optional function, which I didn't even define. That function is provided by Control.Applicative already!

-- Provided by `Control.Applicative`
optional :: Alternative f => f a -> f (Maybe a)
optional v = fmap Just v <|> pure Nothing

None of those operations are Map-specific. They would work on any type that implements Alternative, like Maybe, and they will "do the right thing":

>>> leftJoin Nothing (Just 1)
Just (1,Nothing)
>>> leftJoin (Just 1) Nothing
Nothing

As a bonus, it's easy to do multiple left-joins using this Alternative API. If you wanted to left join in 3 sources, you would just write:

>>> join m1 (optional m2) (optional m3) (optional m4)

This join + optional API cleanly avoids the issue the old API had where you would get nested Maybes when you left join in multiple sources.

So I propose that Alternative is the clean solution to implementing derived join operations, and it already come with obvious laws: the Monoid laws:

empty <|> m = m

m <|> empty = m

(m1 <|> m2) <|> m3 = m1 <|> (m2 <|> m3)

Finally, note that I'm not necessarily proposing that you need to implement the Map type the same way that I did (as a function). However, I do believe that whatever the final implementation is that it should implement both Applicative and Alternative. The Alternative/optional trick for implementing joins is the meat of my proposal and the encoding-a-Map-as-a-function trick is just a stepping stone for explaining the Alternative idea.

Also, some minor nitpicks. There's one more functor law:

map(a) { x => x } = a

... and one more applicative law:

map(join(m, apply(a))) { case (t, _) => t } == m

I need more time to think about TableJoin. I'm still not sure what is the most elegant API for that just yet.

@Gabriella439
Copy link

One error, which is that the last example should be:

>>> let join4 = liftA4 (,,,)
>>> join4 m1 (optional m2) (optional m3) (optional m4)

@johnynek
Copy link
Author

johnynek commented Dec 8, 2014

@Gabriel439 this is really interesting and I appreciate you expanding on it so much. Also, I like the trick you did (in your follow up blog post) to make a Map that is applicative. I overlooked that All | Some (Set k) trick. Nice.

That said: the main goal we are interested in is not modeling joins conceptually, but implementing things like bloomJoin/sketchJoin/blockJoin etc... in a way that does not depend on scalding and could be reused from many map/reduce-like systems. If we are going to do that, we need an abstraction of what a Join is, so that it can be applied without knowledge of how it works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment