Created
February 5, 2011 02:02
-
-
Save parrot-studio/812122 to your computer and use it in GitHub Desktop.
Recommend Index Create Sample by MapReduce of Scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.PrintWriter | |
import scala.collection.mutable.ListBuffer | |
import scala.io.Source | |
import scala.actors._ | |
import scala.actors.Actor._ | |
object RecommendIndexBuilder { | |
// Massage for Actors | |
case class Ready() | |
case class MapEnd() | |
case class MapEndOK() | |
case class ReduceEnd() | |
case class ReduceEndOK() | |
case class MapOutput(val user: String, val page: String) | |
case class ReduceInput(val user: String, val pages: List[String]) | |
case class ReduceOutput(val user: String, val pages: List[(String, Int)]) | |
def main(args: Array[String]): Unit = { | |
new Executor("source/test.log", "source/result.out", 10, 10).execute | |
} | |
class Executor(source: String, out: String, mapSize: Int, reduceSize: Int) { | |
def execute { | |
val m = new MasterActor(mapSize, reduceSize) | |
m.start | |
m ! Ready | |
// Map | |
println("map start") | |
Source.fromFile(source).getLines.zipWithIndex.foreach(m ! _) | |
m ! MapEnd | |
waitMap(m) | |
// Shuffle | |
println("shuffle start") | |
val targets = divide(m.mapResult.toList, List()) | |
// Reduce | |
println("reduce start") | |
targets.zipWithIndex.foreach(m ! _) | |
m ! ReduceEnd | |
waitReduce(m) | |
// Result | |
println("output start") | |
val writer = new PrintWriter(out) | |
m.results.foreach(ri => writer.println(createLine(ri))) | |
writer.close | |
System.exit(0) | |
} | |
def waitMap(m: MasterActor) { | |
if (!m.isMapEnd) { | |
Thread.sleep(500L) | |
waitMap(m) | |
} | |
} | |
def waitReduce(m: MasterActor) { | |
if (!m.isReduceEnd) { | |
Thread.sleep(500L) | |
waitReduce(m) | |
} | |
} | |
def divide(list: List[MapOutput], result: List[ReduceInput]): List[ReduceInput] = { | |
list match { | |
case Nil => result | |
case x :: xs => { | |
val part = xs.partition(_.user == x.user) | |
divide(part._2, result :+ createReduceInput(x, part._1)) | |
} | |
} | |
} | |
def createReduceInput(base: MapOutput, body: List[MapOutput]) = { | |
ReduceInput(base.user, base.page :: (body.map(b => b.page))) | |
} | |
def createLine(ri: ReduceOutput): String = { | |
ri.user + " " + (ri.pages.map(p => p._1 + "/" + p._2).mkString(" ")) | |
} | |
} | |
class MasterActor(mapSize: Int, reduceSize: Int) extends Actor { | |
val mapActors = (1 to mapSize).map(i => new MapActor(i - 1)) | |
val reduceActors = (1 to reduceSize).map(i => new ReduceActor(i - 1)) | |
val mapResult = new ListBuffer[MapOutput] | |
val reduceResult = new ListBuffer[ReduceOutput] | |
val results = new ListBuffer[ReduceOutput] | |
val endMapList = new ListBuffer[Int] | |
val endReduceList = new ListBuffer[Int] | |
def act() { | |
loop { | |
react { | |
case Ready => ready | |
case (line: String, ind: Int) => sendMapper(line, ind) | |
case (mo: MapOutput, index: Int) => appendMapResult(mo, index) | |
case MapEnd => mapFinish | |
case (MapEndOK, ind: Int) => appendMapEnd(ind) | |
case (ri: ReduceInput, ind: Int) => sendReducer(ri, ind) | |
case (ro: ReduceOutput, index: Int) => appendReduceResult(ro, index) | |
case ReduceEnd => reduceFinish | |
case (ReduceEndOK, ind: Int) => appendReduceEnd(ind) | |
case _ => | |
} | |
} | |
} | |
def ready { | |
mapActors.foreach(_.start) | |
reduceActors.foreach(_.start) | |
} | |
def sendMapper(line: String, ind: Int) { | |
mapActors(ind % mapSize) ! line | |
} | |
def appendMapResult(mo: MapOutput, index: Int) { | |
mapResult += mo | |
} | |
def mapFinish { | |
mapActors.foreach(_ ! MapEnd) | |
} | |
def appendMapEnd(index: Int) { | |
endMapList += index | |
} | |
def isMapEnd: Boolean = { | |
(endMapList.length >= mapSize) | |
} | |
def sendReducer(ri: ReduceInput, ind: Int) { | |
reduceActors(ind % reduceSize) ! ri | |
} | |
def appendReduceResult(ro: ReduceOutput, index: Int) { | |
results += ro | |
} | |
def reduceFinish { | |
reduceActors.foreach(_ ! ReduceEnd) | |
} | |
def appendReduceEnd(index: Int) { | |
endReduceList += index | |
} | |
def isReduceEnd: Boolean = { | |
(endReduceList.length >= reduceSize) | |
} | |
} | |
class MapActor(val index: Int) extends Actor { | |
val itemParser = """.* 商品のURL .*""".r | |
val shopParser = """.* お店のURL .*""".r | |
def act() { | |
loop { | |
react { | |
case MapEnd => reply((MapEndOK, index)) | |
case line: String => analysis(line) | |
case _ => | |
} | |
} | |
} | |
def analysis(line: String) { | |
line match { | |
case itemParser(user, item) => reply((MapOutput(user, item), index)) | |
case shopParser(user, shop) => reply((MapOutput(user, shop), index)) | |
case _ => | |
} | |
} | |
} | |
class ReduceActor(val index: Int) extends Actor { | |
def act { | |
loop { | |
react { | |
case ReduceEnd => reply((ReduceEndOK, index)) | |
case ri: ReduceInput => reply((reduce(ri), index)) | |
case _ => | |
} | |
} | |
} | |
def reduce(ri: ReduceInput) = { | |
val list = divide(ri.pages.sortWith(_ < _), List()) | |
ReduceOutput(ri.user, list.map(l => (l.head, l.length))) | |
} | |
def divide(source: List[String], result: List[List[String]]): List[List[String]] = { | |
source match { | |
case Nil => result | |
case x :: xs => { | |
val part = xs.span(_ == x) | |
divide(part._2, result :+ (x :: part._1)) | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment