Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
MapReduce in Scala
package object mapreduce {
import _root_.scala.actors.Futures._
import _root_.scala.collection.SortedMap
class Mappable[KEYIN, VALUEIN](mappee: Iterable[(KEYIN, VALUEIN)]) {
def mapper[KEYOUT, VALUEOUT](mapper: (KEYIN, VALUEIN) => Iterable[(KEYOUT, VALUEOUT)])(implicit ord: Ordering[KEYOUT]) : Iterable[(KEYOUT, VALUEOUT)] = {
mappee.map { case (key, value) => future { mapper(key, value) } }.flatMap { _() }
}
}
implicit def iterable2Mappable[A, B](m: Iterable[(A, B)]) = new Mappable(m)
class Reducable[KEYIN, VALUEIN](reducee: Iterable[(KEYIN, VALUEIN)])(implicit ord: Ordering[KEYIN]) {
def reducer[KEYOUT, VALUEOUT](reducer: (KEYIN, Iterable[VALUEIN]) => (KEYOUT, VALUEOUT)) : Iterable[(KEYOUT, VALUEOUT)] = {
reducee.foldLeft(SortedMap.empty[KEYIN, List[VALUEIN]](ord)) {
case (map, (key, value)) => {
map + (key -> (value :: map.getOrElse(key, Nil)))
}
}.map { case (key, values) => future { reducer(key, values) } }.map { _() }
}
}
implicit def iterable2Reducable[A, B](r: Iterable[(A, B)])(implicit ord: Ordering[A]) = new Reducable(r)(ord)
}
object WordCount {
def main(args: Array[String]) {
import mapreduce._
import _root_.scala.io.Source
def textInputFormat(lines: Iterator[String], offset: Long = 0): Stream[(Long, String)] = {
if(lines.hasNext) {
val line = lines.next
Stream.cons((offset, line), textInputFormat(lines, offset+line.length))
}
else {
Stream.empty
}
}
val source = Source.fromFile(args(0))
try {
textInputFormat(source.getLines).mapper {
(offset, str) => {
str.split("\\W+").collect { case word if word != "" => (word -> 1) }
}
}.reducer {
(word, counts) => {
word -> (counts.sum)
}
}.foreach { case (key, value) => println("%s: %d".format(key, value)) }
} finally {
source.close
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment