Skip to content

Instantly share code, notes, and snippets.

Created February 19, 2013 17:41
Show Gist options
  • Save anonymous/4988103 to your computer and use it in GitHub Desktop.
Save anonymous/4988103 to your computer and use it in GitHub Desktop.
A Toy implementation of MapReduce to practice thinking of problems in a Hadoop compatible way.
#!/bin/sh
exec scala -savecompiled "$0" "$@"
!#
// Toy Map Reduce framework:
class MapReduce[T,K,V,R](flatMapFn: (T) => Iterable[(K,V)], reduceFn: ((K,Iterable[V])) => R) {
def apply(input: Iterable[T]): Map[K,R] = {
// Apply the flatMap function:
val mapped: Iterable[(K,V)] = input.flatMap(flatMapFn)
// do the shuffle (in distributed systems, sets of keys are handled by different workers
val shuffled: Map[K,Iterable[(K,V)]] = mapped.groupBy { _._1 }
// Just keep the V values in the second part of the Map (to clean up the function we pass in)
val values: Map[K, (K,Iterable[V])] = shuffled.map { case (k,kvs) => (k, (k, kvs.map { _._2 })) }
// apply reduce:
values.mapValues(reduceFn)
}
}
/*
* Now let's use our MapReduce framework to do wordcount
*/
// Map onto (Key, Value)
def mapFunction(line: String): Iterable[(String,Int)] =
line.split("""\s+""")
.map { word => (word, 1) }
def reduceFunction(groupedWords: (String, Iterable[Int])): Int =
groupedWords._2.sum
// Pass these functions to our job:
val myWordCountJob = new MapReduce(mapFunction _, reduceFunction _)
// Run the job:
val input: Iterable[String] = scala.io.Source.stdin.getLines.toIterable
val result = myWordCountJob.apply(input)
//print it out:
result
.toList
.sortBy { -_._2 }
.foreach { println(_) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment