Skip to content

Instantly share code, notes, and snippets.

@parrot-studio
Created February 5, 2011 02:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save parrot-studio/812122 to your computer and use it in GitHub Desktop.
Save parrot-studio/812122 to your computer and use it in GitHub Desktop.
Recommend Index Create Sample by MapReduce of Scala
import java.io.PrintWriter
import scala.collection.mutable.ListBuffer
import scala.io.Source
import scala.actors._
import scala.actors.Actor._
object RecommendIndexBuilder {
// Massage for Actors
case class Ready()
case class MapEnd()
case class MapEndOK()
case class ReduceEnd()
case class ReduceEndOK()
case class MapOutput(val user: String, val page: String)
case class ReduceInput(val user: String, val pages: List[String])
case class ReduceOutput(val user: String, val pages: List[(String, Int)])
def main(args: Array[String]): Unit = {
new Executor("source/test.log", "source/result.out", 10, 10).execute
}
class Executor(source: String, out: String, mapSize: Int, reduceSize: Int) {
def execute {
val m = new MasterActor(mapSize, reduceSize)
m.start
m ! Ready
// Map
println("map start")
Source.fromFile(source).getLines.zipWithIndex.foreach(m ! _)
m ! MapEnd
waitMap(m)
// Shuffle
println("shuffle start")
val targets = divide(m.mapResult.toList, List())
// Reduce
println("reduce start")
targets.zipWithIndex.foreach(m ! _)
m ! ReduceEnd
waitReduce(m)
// Result
println("output start")
val writer = new PrintWriter(out)
m.results.foreach(ri => writer.println(createLine(ri)))
writer.close
System.exit(0)
}
def waitMap(m: MasterActor) {
if (!m.isMapEnd) {
Thread.sleep(500L)
waitMap(m)
}
}
def waitReduce(m: MasterActor) {
if (!m.isReduceEnd) {
Thread.sleep(500L)
waitReduce(m)
}
}
def divide(list: List[MapOutput], result: List[ReduceInput]): List[ReduceInput] = {
list match {
case Nil => result
case x :: xs => {
val part = xs.partition(_.user == x.user)
divide(part._2, result :+ createReduceInput(x, part._1))
}
}
}
def createReduceInput(base: MapOutput, body: List[MapOutput]) = {
ReduceInput(base.user, base.page :: (body.map(b => b.page)))
}
def createLine(ri: ReduceOutput): String = {
ri.user + " " + (ri.pages.map(p => p._1 + "/" + p._2).mkString(" "))
}
}
class MasterActor(mapSize: Int, reduceSize: Int) extends Actor {
val mapActors = (1 to mapSize).map(i => new MapActor(i - 1))
val reduceActors = (1 to reduceSize).map(i => new ReduceActor(i - 1))
val mapResult = new ListBuffer[MapOutput]
val reduceResult = new ListBuffer[ReduceOutput]
val results = new ListBuffer[ReduceOutput]
val endMapList = new ListBuffer[Int]
val endReduceList = new ListBuffer[Int]
def act() {
loop {
react {
case Ready => ready
case (line: String, ind: Int) => sendMapper(line, ind)
case (mo: MapOutput, index: Int) => appendMapResult(mo, index)
case MapEnd => mapFinish
case (MapEndOK, ind: Int) => appendMapEnd(ind)
case (ri: ReduceInput, ind: Int) => sendReducer(ri, ind)
case (ro: ReduceOutput, index: Int) => appendReduceResult(ro, index)
case ReduceEnd => reduceFinish
case (ReduceEndOK, ind: Int) => appendReduceEnd(ind)
case _ =>
}
}
}
def ready {
mapActors.foreach(_.start)
reduceActors.foreach(_.start)
}
def sendMapper(line: String, ind: Int) {
mapActors(ind % mapSize) ! line
}
def appendMapResult(mo: MapOutput, index: Int) {
mapResult += mo
}
def mapFinish {
mapActors.foreach(_ ! MapEnd)
}
def appendMapEnd(index: Int) {
endMapList += index
}
def isMapEnd: Boolean = {
(endMapList.length >= mapSize)
}
def sendReducer(ri: ReduceInput, ind: Int) {
reduceActors(ind % reduceSize) ! ri
}
def appendReduceResult(ro: ReduceOutput, index: Int) {
results += ro
}
def reduceFinish {
reduceActors.foreach(_ ! ReduceEnd)
}
def appendReduceEnd(index: Int) {
endReduceList += index
}
def isReduceEnd: Boolean = {
(endReduceList.length >= reduceSize)
}
}
class MapActor(val index: Int) extends Actor {
val itemParser = """.* 商品のURL .*""".r
val shopParser = """.* お店のURL .*""".r
def act() {
loop {
react {
case MapEnd => reply((MapEndOK, index))
case line: String => analysis(line)
case _ =>
}
}
}
def analysis(line: String) {
line match {
case itemParser(user, item) => reply((MapOutput(user, item), index))
case shopParser(user, shop) => reply((MapOutput(user, shop), index))
case _ =>
}
}
}
class ReduceActor(val index: Int) extends Actor {
def act {
loop {
react {
case ReduceEnd => reply((ReduceEndOK, index))
case ri: ReduceInput => reply((reduce(ri), index))
case _ =>
}
}
}
def reduce(ri: ReduceInput) = {
val list = divide(ri.pages.sortWith(_ < _), List())
ReduceOutput(ri.user, list.map(l => (l.head, l.length)))
}
def divide(source: List[String], result: List[List[String]]): List[List[String]] = {
source match {
case Nil => result
case x :: xs => {
val part = xs.span(_ == x)
divide(part._2, result :+ (x :: part._1))
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment