Created
December 10, 2011 06:53
-
-
Save j5ik2o/1454740 to your computer and use it in GitHub Desktop.
MapReduceサンプル
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mapreduce | |
import akka.actor.Actor | |
import akka.actor.ActorRef | |
import scala.collection.mutable.ListBuffer | |
import scala.concurrent.ops._ | |
import scala.io.Source | |
import scala.annotation.tailrec | |
import scala.io.Codec | |
import scala.xml.XML | |
import dynamicJSON.DynamicJSON | |
import net.reduls.igo.Tagger | |
// mapの入力 | |
case class MapInput(text: String) | |
// mapの出力 | |
case class MapOutput(entries: List[(String, Int)]) | |
// reduceの入力 | |
case class ReduceInput(key: String, entries: List[(String, Int)]) | |
// reduceの出力 | |
case class ReduceOutput(key: String, value: Int) | |
// 開始メッセージ | |
case class Begin() | |
// map完了メッセージ | |
case class MapCompleted() | |
// reduce完了メッセージ | |
case class ReduceCompleted() | |
object WordCount { | |
val tagger = new Tagger("ipadic"); | |
def main(args: Array[String]): Unit = { | |
// val data = List("Hello World", | |
// "Hello Scala World", | |
// "Hello Java World") | |
val url = "http://b.hatena.ne.jp/entry/json/" + args(0) | |
printf("url = %s\n", url) | |
val source = Source.fromURL(url, "UTF-8") | |
val json = source.getLines.mkString("\n") | |
val bookmarkComments = DynamicJSON(json).bookmarks.array.flatMap { _.comment.typed[String] }.filter(_.size > 0) | |
//val data = source.getLines().toList | |
val mapActors = prepareMapActors(3) | |
val reduceActors = prepareReduceActors(3) | |
val masterActor = Actor.actorOf(new MasterActor(mapActors, reduceActors, bookmarkComments)).start | |
println("WordCount: Begin") | |
masterActor ! Begin | |
while (masterActor.isRunning) { | |
println("WordCont: processing...") | |
Thread.sleep(1000L) | |
} | |
} | |
/** | |
* num個分のMapActorを生成して開始する | |
* | |
* @param num アクターの個数 | |
* @return MapActorのリスト | |
*/ | |
def prepareMapActors(num: Int) = { | |
val actors = for (i <- 0 until num) yield { Actor.actorOf[MapActor].start } | |
actors.toList | |
} | |
/** | |
* num個分のMapActorを生成して開始する | |
* | |
* @param num アクターの個数 | |
* @return ReduceActorのリスト | |
*/ | |
def prepareReduceActors(num: Int) = { | |
val actors = for (i <- 0 until num) yield { Actor.actorOf[ReduceActor].start } | |
actors.toList | |
} | |
/** | |
* マスター用アクター | |
* | |
* @param mapActors MapActorのリスト | |
* @param reduceActors ReduceActorのリスト | |
* @param 処理対象のデータのリスト | |
*/ | |
class MasterActor(mapActors: List[ActorRef], | |
reduceActors: List[ActorRef], | |
data: List[String]) extends Actor { | |
val mapOutputs = new ListBuffer[MapOutput] | |
val reduceInputs = new ListBuffer[ReduceInput] | |
val reduceOutputs = new ListBuffer[ReduceOutput] | |
def receive = { | |
// Map処理を要求する | |
case Begin => requestMap(0, data) | |
// map処理の結果を受け取る | |
case mo: MapOutput => | |
appendMapOutput(mo) | |
checkMapProgress | |
// map処理が終了したら reduce処理を要求する | |
case MapCompleted => | |
requestReduce(0, split(sort(mapOutputs.toList))) | |
// reduce処理の結果を受け取る | |
case ro: ReduceOutput => | |
appendReduceOutput(ro) | |
checkReduceProgress | |
// reduce処理が終了したら 処理結果を表示しマスター用Actorを終了する | |
case ReduceCompleted => | |
printResult | |
sys.exit | |
case _ => throw new Error | |
} | |
/** | |
* map処理を要求する。 | |
* | |
* @param index 要素のインデックス | |
* @param data 文字列のリスト | |
*/ | |
def requestMap(index: Int, data: List[String]) { | |
data match { | |
case Nil => | |
case x :: xs => | |
mapActors(index % mapActors.size) ! MapInput(x) | |
requestMap(index + 1, xs) | |
} | |
} | |
/** | |
* reduce処理を要求する。 | |
* | |
* @param index 要素のインデックス | |
* @param data ReduceInputのリスト | |
*/ | |
def requestReduce(index: Int, data: List[ReduceInput]) { | |
data match { | |
case Nil => | |
case x :: xs => | |
reduceActors(index % reduceActors.size) ! x | |
requestReduce(index + 1, xs) | |
} | |
} | |
/** | |
* mapOutputのリストをソートする | |
* | |
* @param mapOutputs MapOutputのリスト | |
* @return (単語, n個)のリスト | |
*/ | |
def sort(mapOutputs: List[MapOutput]) = | |
mapOutputs.flatMap(_.entries).sortWith(_._1 < _._1) | |
/** | |
* (単語, n個)をReduceInputのリストを変換する。 | |
* | |
* @param data (単語, n個) | |
* @return ReduceInputのリスト | |
*/ | |
def split(data: List[(String, Int)]): List[ReduceInput] = { | |
data match { | |
case Nil => reduceInputs.toList | |
case x :: xs => { | |
//tailの中の要素の文字列が最初の要素の文字列と同じかどうか判定し | |
val spannedXs = xs.span(_._1 == x._1) | |
reduceInputs += createReduceInput(x, spannedXs._1) | |
split(spannedXs._2) | |
} | |
} | |
} | |
/** | |
* ReduceInputの生成 | |
* | |
* @param head | |
* @param remainder | |
* @return ReduceInput | |
*/ | |
def createReduceInput(head: (String, Int), remainder: List[(String, Int)]) = { | |
val reduceInput = ReduceInput(head._1, head :: remainder) | |
println("MasterActor: create: " + reduceInput) | |
reduceInput | |
} | |
/** | |
* 結果を出力する。 | |
*/ | |
def printResult { | |
println("========= Result ==========") | |
for (o <- reduceOutputs.toList.sortWith(_.value > _.value).take(10)) { | |
printf("%-10s:%8d\n", o.key, o.value) | |
} | |
println("===========================") | |
} | |
/** | |
* mapOutputsに追加する。 | |
* | |
* @param o MapOutput | |
*/ | |
def appendMapOutput(o: MapOutput) = { | |
printf("MasterActor: received the response: map(%s)\n", o) | |
mapOutputs += o | |
} | |
/** | |
* reduceOutputsに追加する。 | |
* | |
* @param o ReduceOutput | |
*/ | |
def appendReduceOutput(o: ReduceOutput) = { | |
printf("MasterActor: received the response: reduce(%s)\n", o) | |
reduceOutputs += o | |
} | |
/** | |
* Map処理が終わったら MapCompleted を送信する | |
*/ | |
def checkMapProgress = | |
if (mapOutputs.size == data.size) | |
self ! MapCompleted | |
/** | |
* Reduce処理が終わったら ReduceCompleted を送信する | |
*/ | |
def checkReduceProgress = | |
if (reduceInputs.size == reduceOutputs.size) | |
self ! ReduceCompleted | |
} | |
/** | |
* reduce用アクター | |
*/ | |
class ReduceActor extends Actor { | |
def receive = { | |
// ReduceInputを受け取って単語単位の出現数を集計して返す | |
case ri: ReduceInput => | |
self.reply(reduce(ri)) | |
case _ => throw new Error | |
} | |
/** | |
* reduce処理本体 | |
* | |
* @param ri ReduceInput | |
* @return ReduceOutput | |
*/ | |
def reduce(ri: ReduceInput) = | |
ReduceOutput(ri.key, ri.entries.foldLeft(0)(_ + _._2)) | |
} | |
/** | |
* map用アクター | |
*/ | |
class MapActor extends Actor { | |
def receive = { | |
// map用のメッセージがきたらmap処理を行い、MapOutputを返信する | |
case MapInput(text) => | |
self.reply(map(text)) | |
case _ => throw new Error | |
} | |
/** | |
* map処理本体 | |
* | |
* 文字列を単語のリストに分解する | |
* | |
* @param text 文字列 | |
* @return MapOutput | |
*/ | |
def map(text: String) = { | |
import scala.collection.JavaConverters._ | |
val morphones = tagger.parse(text).asScala.toList | |
val result = morphones.withFilter(e => e.feature.startsWith("名詞")).map(e => (e.surface, 1)).toList | |
MapOutput(result) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment