Created
June 2, 2015 22:19
-
-
Save AGhost-7/36f292a1c476ef0dd3d0 to your computer and use it in GitHub Desktop.
Akka Workcount Problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "akka-wordcount" | |
scalaVersion := "2.11.6" | |
libraryDependencies ++= Seq( | |
"com.typesafe.akka" %% "akka-actor" % "2.3.11" | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File | |
import akka.actor._ | |
import scala.concurrent.{ExecutionContext, Promise, Future} | |
import scala.util.{Success, Failure} | |
/** All IO methods */ | |
object FileIO { | |
import java.nio.channels.{AsynchronousFileChannel, CompletionHandler} | |
import java.nio.ByteBuffer | |
import java.nio.file.StandardOpenOption | |
def readFile(file: File): Future[String] = { | |
val p = Promise[String] | |
// Could allocate size based on metadata, but that's something else... | |
val buffer = ByteBuffer.allocate(500) | |
AsynchronousFileChannel | |
.open(file.toPath(), StandardOpenOption.READ) | |
.read(buffer, 0, buffer, new CompletionHandler[Integer, ByteBuffer] { | |
def completed(res: Integer, buf: ByteBuffer): Unit = { | |
buf.flip() | |
val bytes = new Array[Byte](buf.limit()) | |
buf.get(bytes) | |
p.success(new String(bytes)) | |
} | |
def failed(e : Throwable, buf: ByteBuffer): Unit = p.failure(e) | |
}) | |
p.future | |
} | |
def getAllFiles(directory: File): Array[File] = { | |
val (dirs, files) = directory | |
.listFiles() | |
.partition { _.isDirectory } | |
files ++ dirs.flatMap(getAllFiles) | |
} | |
} | |
/** Application messages */ | |
object SysMessages { | |
case class CountDirectory(dir: File) | |
case class CountFile(file: File) | |
case class FileCount(count: Int) | |
case class WordCountTotal(count: Int) | |
} | |
import SysMessages._ | |
/** Counter Actor */ | |
object FileCounter { | |
def props() = Props(new FileCounter()) | |
} | |
class FileCounter() extends Actor with ActorLogging { | |
import context.dispatcher | |
override def preStart = { | |
log.info("FileCounter Actor initialized") | |
} | |
def receive = { | |
case CountFile(file) => | |
log.info("Counting file: " + file.getAbsolutePath) | |
FileIO.readFile(file).foreach { data => | |
val words = data | |
.split("\n") | |
.map { _.split(" ").length } | |
.sum | |
context.parent ! FileCount(words) | |
} | |
} | |
} | |
/** Supervisor for directory */ | |
object CounterSupervisor { | |
def props(actorPool: Int) = Props(new CounterSupervisor(actorPool)) | |
} | |
class CounterSupervisor(actorPool: Int) extends Actor with ActorLogging { | |
var total = 0 | |
var files: Array[File] = _ | |
var pendingActors = 0 | |
override def preStart = { | |
for(i <- 1 to actorPool) | |
context.actorOf(FileCounter.props(), name = s"counter$i") | |
} | |
def receive = { | |
case CountDirectory(base) => | |
log.info("Now counting starting from directory : " + base.getAbsolutePath) | |
total = 0 | |
files = FileIO.getAllFiles(base) | |
pendingActors = 0 | |
for(i <- 1 to actorPool if(i < files.length)) { | |
pendingActors += 1 | |
context.child(s"counter$i").get ! CountFile(files.head) | |
files = files.tail | |
} | |
case FileCount(count) => | |
total += count | |
pendingActors -= 1 | |
if(files.length > 0) { | |
sender() ! CountFile(files.head) | |
files = files.tail | |
pendingActors += 1 | |
} else if(pendingActors == 0) { | |
context.parent ! WordCountTotal(total) | |
} | |
} | |
} | |
/** Supervisor of supervisors */ | |
object WordCountForker { | |
def props(counterActors: Int) = Props(new WordCountForker(counterActors)) | |
} | |
class WordCountForker(counterActors: Int) extends Actor with ActorLogging { | |
var busyActors: List[(ActorRef, ActorRef)] = Nil | |
var idleActors: List[ActorRef] = _ | |
override def preStart = { | |
val first = context.actorOf(CounterSupervisor.props(counterActors)) | |
idleActors = List(first) | |
log.info(s"Initialized first supervisor with $counterActors file counters.") | |
} | |
def receive = { | |
case msg @ CountDirectory(dir) => | |
log.info("Count directory received") | |
val counter = idleActors match { | |
case Nil => | |
context.actorOf(CounterSupervisor.props(counterActors)) | |
case head :: rest => | |
idleActors = rest | |
head | |
} | |
counter ! msg | |
busyActors = (counter, sender()) :: busyActors | |
case msg @ WordCountTotal(n) => | |
val path = sender().path.toString() | |
val index = busyActors.indexWhere { _._1.path.toString == path } | |
val (counter, replyTo) = busyActors(index) | |
replyTo ! msg | |
idleActors = counter :: idleActors | |
busyActors = busyActors.patch(index, Nil, 1) | |
} | |
} | |
object Main extends App { | |
import SysMessages._ | |
val sys = ActorSystem() | |
import akka.util.Timeout | |
import scala.concurrent.duration._ | |
implicit val timeout = Timeout(5 seconds) | |
val fl = new File("data") | |
val supervisor = sys.actorOf(WordCountForker.props(4), name = "forker") | |
import akka.pattern.ask | |
val response = supervisor ? CountDirectory(fl) | |
implicit val context : ExecutionContext = sys.dispatcher | |
response.onSuccess { | |
case WordCountTotal(n) => | |
println("Total computed: " + n) | |
} | |
response.onComplete { | |
case _ => sys.shutdown() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment