Skip to content

Instantly share code, notes, and snippets.

@AGhost-7
Created June 2, 2015 22:19
Show Gist options
  • Save AGhost-7/36f292a1c476ef0dd3d0 to your computer and use it in GitHub Desktop.
Save AGhost-7/36f292a1c476ef0dd3d0 to your computer and use it in GitHub Desktop.
Akka Workcount Problem
name := "akka-wordcount"
scalaVersion := "2.11.6"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.3.11"
)
import java.io.File
import akka.actor._
import scala.concurrent.{ExecutionContext, Promise, Future}
import scala.util.{Success, Failure}
/** All IO methods */
object FileIO {
import java.nio.channels.{AsynchronousFileChannel, CompletionHandler}
import java.nio.ByteBuffer
import java.nio.file.StandardOpenOption
def readFile(file: File): Future[String] = {
val p = Promise[String]
// Could allocate size based on metadata, but that's something else...
val buffer = ByteBuffer.allocate(500)
AsynchronousFileChannel
.open(file.toPath(), StandardOpenOption.READ)
.read(buffer, 0, buffer, new CompletionHandler[Integer, ByteBuffer] {
def completed(res: Integer, buf: ByteBuffer): Unit = {
buf.flip()
val bytes = new Array[Byte](buf.limit())
buf.get(bytes)
p.success(new String(bytes))
}
def failed(e : Throwable, buf: ByteBuffer): Unit = p.failure(e)
})
p.future
}
def getAllFiles(directory: File): Array[File] = {
val (dirs, files) = directory
.listFiles()
.partition { _.isDirectory }
files ++ dirs.flatMap(getAllFiles)
}
}
/** Application messages */
object SysMessages {
case class CountDirectory(dir: File)
case class CountFile(file: File)
case class FileCount(count: Int)
case class WordCountTotal(count: Int)
}
import SysMessages._
/** Counter Actor */
object FileCounter {
def props() = Props(new FileCounter())
}
class FileCounter() extends Actor with ActorLogging {
import context.dispatcher
override def preStart = {
log.info("FileCounter Actor initialized")
}
def receive = {
case CountFile(file) =>
log.info("Counting file: " + file.getAbsolutePath)
FileIO.readFile(file).foreach { data =>
val words = data
.split("\n")
.map { _.split(" ").length }
.sum
context.parent ! FileCount(words)
}
}
}
/** Supervisor for directory */
object CounterSupervisor {
def props(actorPool: Int) = Props(new CounterSupervisor(actorPool))
}
class CounterSupervisor(actorPool: Int) extends Actor with ActorLogging {
var total = 0
var files: Array[File] = _
var pendingActors = 0
override def preStart = {
for(i <- 1 to actorPool)
context.actorOf(FileCounter.props(), name = s"counter$i")
}
def receive = {
case CountDirectory(base) =>
log.info("Now counting starting from directory : " + base.getAbsolutePath)
total = 0
files = FileIO.getAllFiles(base)
pendingActors = 0
for(i <- 1 to actorPool if(i < files.length)) {
pendingActors += 1
context.child(s"counter$i").get ! CountFile(files.head)
files = files.tail
}
case FileCount(count) =>
total += count
pendingActors -= 1
if(files.length > 0) {
sender() ! CountFile(files.head)
files = files.tail
pendingActors += 1
} else if(pendingActors == 0) {
context.parent ! WordCountTotal(total)
}
}
}
/** Supervisor of supervisors */
object WordCountForker {
def props(counterActors: Int) = Props(new WordCountForker(counterActors))
}
class WordCountForker(counterActors: Int) extends Actor with ActorLogging {
var busyActors: List[(ActorRef, ActorRef)] = Nil
var idleActors: List[ActorRef] = _
override def preStart = {
val first = context.actorOf(CounterSupervisor.props(counterActors))
idleActors = List(first)
log.info(s"Initialized first supervisor with $counterActors file counters.")
}
def receive = {
case msg @ CountDirectory(dir) =>
log.info("Count directory received")
val counter = idleActors match {
case Nil =>
context.actorOf(CounterSupervisor.props(counterActors))
case head :: rest =>
idleActors = rest
head
}
counter ! msg
busyActors = (counter, sender()) :: busyActors
case msg @ WordCountTotal(n) =>
val path = sender().path.toString()
val index = busyActors.indexWhere { _._1.path.toString == path }
val (counter, replyTo) = busyActors(index)
replyTo ! msg
idleActors = counter :: idleActors
busyActors = busyActors.patch(index, Nil, 1)
}
}
object Main extends App {
import SysMessages._
val sys = ActorSystem()
import akka.util.Timeout
import scala.concurrent.duration._
implicit val timeout = Timeout(5 seconds)
val fl = new File("data")
val supervisor = sys.actorOf(WordCountForker.props(4), name = "forker")
import akka.pattern.ask
val response = supervisor ? CountDirectory(fl)
implicit val context : ExecutionContext = sys.dispatcher
response.onSuccess {
case WordCountTotal(n) =>
println("Total computed: " + n)
}
response.onComplete {
case _ => sys.shutdown()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment