Skip to content

Instantly share code, notes, and snippets.

@j5ik2o
Created December 10, 2011 06:53
Show Gist options
  • Save j5ik2o/1454740 to your computer and use it in GitHub Desktop.
Save j5ik2o/1454740 to your computer and use it in GitHub Desktop.
MapReduceサンプル
package mapreduce
import akka.actor.Actor
import akka.actor.ActorRef
import scala.collection.mutable.ListBuffer
import scala.concurrent.ops._
import scala.io.Source
import scala.annotation.tailrec
import scala.io.Codec
import scala.xml.XML
import dynamicJSON.DynamicJSON
import net.reduls.igo.Tagger
// mapの入力
case class MapInput(text: String)
// mapの出力
case class MapOutput(entries: List[(String, Int)])
// reduceの入力
case class ReduceInput(key: String, entries: List[(String, Int)])
// reduceの出力
case class ReduceOutput(key: String, value: Int)
// 開始メッセージ
case class Begin()
// map完了メッセージ
case class MapCompleted()
// reduce完了メッセージ
case class ReduceCompleted()
object WordCount {
val tagger = new Tagger("ipadic");
def main(args: Array[String]): Unit = {
// val data = List("Hello World",
// "Hello Scala World",
// "Hello Java World")
val url = "http://b.hatena.ne.jp/entry/json/" + args(0)
printf("url = %s\n", url)
val source = Source.fromURL(url, "UTF-8")
val json = source.getLines.mkString("\n")
val bookmarkComments = DynamicJSON(json).bookmarks.array.flatMap { _.comment.typed[String] }.filter(_.size > 0)
//val data = source.getLines().toList
val mapActors = prepareMapActors(3)
val reduceActors = prepareReduceActors(3)
val masterActor = Actor.actorOf(new MasterActor(mapActors, reduceActors, bookmarkComments)).start
println("WordCount: Begin")
masterActor ! Begin
while (masterActor.isRunning) {
println("WordCont: processing...")
Thread.sleep(1000L)
}
}
/**
* num個分のMapActorを生成して開始する
*
* @param num アクターの個数
* @return MapActorのリスト
*/
def prepareMapActors(num: Int) = {
val actors = for (i <- 0 until num) yield { Actor.actorOf[MapActor].start }
actors.toList
}
/**
* num個分のMapActorを生成して開始する
*
* @param num アクターの個数
* @return ReduceActorのリスト
*/
def prepareReduceActors(num: Int) = {
val actors = for (i <- 0 until num) yield { Actor.actorOf[ReduceActor].start }
actors.toList
}
/**
* マスター用アクター
*
* @param mapActors MapActorのリスト
* @param reduceActors ReduceActorのリスト
* @param 処理対象のデータのリスト
*/
class MasterActor(mapActors: List[ActorRef],
reduceActors: List[ActorRef],
data: List[String]) extends Actor {
val mapOutputs = new ListBuffer[MapOutput]
val reduceInputs = new ListBuffer[ReduceInput]
val reduceOutputs = new ListBuffer[ReduceOutput]
def receive = {
// Map処理を要求する
case Begin => requestMap(0, data)
// map処理の結果を受け取る
case mo: MapOutput =>
appendMapOutput(mo)
checkMapProgress
// map処理が終了したら reduce処理を要求する
case MapCompleted =>
requestReduce(0, split(sort(mapOutputs.toList)))
// reduce処理の結果を受け取る
case ro: ReduceOutput =>
appendReduceOutput(ro)
checkReduceProgress
// reduce処理が終了したら 処理結果を表示しマスター用Actorを終了する
case ReduceCompleted =>
printResult
sys.exit
case _ => throw new Error
}
/**
* map処理を要求する。
*
* @param index 要素のインデックス
* @param data 文字列のリスト
*/
def requestMap(index: Int, data: List[String]) {
data match {
case Nil =>
case x :: xs =>
mapActors(index % mapActors.size) ! MapInput(x)
requestMap(index + 1, xs)
}
}
/**
* reduce処理を要求する。
*
* @param index 要素のインデックス
* @param data ReduceInputのリスト
*/
def requestReduce(index: Int, data: List[ReduceInput]) {
data match {
case Nil =>
case x :: xs =>
reduceActors(index % reduceActors.size) ! x
requestReduce(index + 1, xs)
}
}
/**
* mapOutputのリストをソートする
*
* @param mapOutputs MapOutputのリスト
* @return (単語, n個)のリスト
*/
def sort(mapOutputs: List[MapOutput]) =
mapOutputs.flatMap(_.entries).sortWith(_._1 < _._1)
/**
* (単語, n個)をReduceInputのリストを変換する。
*
* @param data (単語, n個)
* @return ReduceInputのリスト
*/
def split(data: List[(String, Int)]): List[ReduceInput] = {
data match {
case Nil => reduceInputs.toList
case x :: xs => {
//tailの中の要素の文字列が最初の要素の文字列と同じかどうか判定し
val spannedXs = xs.span(_._1 == x._1)
reduceInputs += createReduceInput(x, spannedXs._1)
split(spannedXs._2)
}
}
}
/**
* ReduceInputの生成
*
* @param head
* @param remainder
* @return ReduceInput
*/
def createReduceInput(head: (String, Int), remainder: List[(String, Int)]) = {
val reduceInput = ReduceInput(head._1, head :: remainder)
println("MasterActor: create: " + reduceInput)
reduceInput
}
/**
* 結果を出力する。
*/
def printResult {
println("========= Result ==========")
for (o <- reduceOutputs.toList.sortWith(_.value > _.value).take(10)) {
printf("%-10s:%8d\n", o.key, o.value)
}
println("===========================")
}
/**
* mapOutputsに追加する。
*
* @param o MapOutput
*/
def appendMapOutput(o: MapOutput) = {
printf("MasterActor: received the response: map(%s)\n", o)
mapOutputs += o
}
/**
* reduceOutputsに追加する。
*
* @param o ReduceOutput
*/
def appendReduceOutput(o: ReduceOutput) = {
printf("MasterActor: received the response: reduce(%s)\n", o)
reduceOutputs += o
}
/**
* Map処理が終わったら MapCompleted を送信する
*/
def checkMapProgress =
if (mapOutputs.size == data.size)
self ! MapCompleted
/**
* Reduce処理が終わったら ReduceCompleted を送信する
*/
def checkReduceProgress =
if (reduceInputs.size == reduceOutputs.size)
self ! ReduceCompleted
}
/**
* reduce用アクター
*/
class ReduceActor extends Actor {
def receive = {
// ReduceInputを受け取って単語単位の出現数を集計して返す
case ri: ReduceInput =>
self.reply(reduce(ri))
case _ => throw new Error
}
/**
* reduce処理本体
*
* @param ri ReduceInput
* @return ReduceOutput
*/
def reduce(ri: ReduceInput) =
ReduceOutput(ri.key, ri.entries.foldLeft(0)(_ + _._2))
}
/**
* map用アクター
*/
class MapActor extends Actor {
def receive = {
// map用のメッセージがきたらmap処理を行い、MapOutputを返信する
case MapInput(text) =>
self.reply(map(text))
case _ => throw new Error
}
/**
* map処理本体
*
* 文字列を単語のリストに分解する
*
* @param text 文字列
* @return MapOutput
*/
def map(text: String) = {
import scala.collection.JavaConverters._
val morphones = tagger.parse(text).asScala.toList
val result = morphones.withFilter(e => e.feature.startsWith("名詞")).map(e => (e.surface, 1)).toList
MapOutput(result)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment