Skip to content

Instantly share code, notes, and snippets.

@Diego81
Last active December 12, 2021 12:36
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save Diego81/9887105 to your computer and use it in GitHub Desktop.
Save Diego81/9887105 to your computer and use it in GitHub Desktop.
Simple Word Counter implemented using Akka
import akka.actor.{ Actor, ActorRef, Props, ActorSystem }
case class ProcessStringMsg(string: String)
case class StringProcessedMsg(words: Integer)
class StringCounterActor extends Actor {
def receive = {
case ProcessStringMsg(string) => {
val wordsInLine = string.split(" ").length
sender ! StringProcessedMsg(wordsInLine)
}
case _ => println("Error: message not recognized")
}
}
case class StartProcessFileMsg()
class WordCounterActor(filename: String) extends Actor {
private var running = false
private var totalLines = 0
private var linesProcessed = 0
private var result = 0
private var fileSender: Option[ActorRef] = None
def receive = {
case StartProcessFileMsg() => {
if (running) {
// println just used for example purposes;
// Akka logger should be used instead
println("Warning: duplicate start message received")
} else {
running = true
fileSender = Some(sender) // save reference to process invoker
import scala.io.Source._
fromFile(filename).getLines.foreach { line =>
context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line)
totalLines += 1
}
}
}
case StringProcessedMsg(words) => {
result += words
linesProcessed += 1
if (linesProcessed == totalLines) {
fileSender.map(_ ! result) // provide result to process invoker
}
}
case _ => println("message not recognized!")
}
}
object Sample extends App {
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import akka.dispatch.ExecutionContexts._
implicit val ec = global
override def main(args: Array[String]) {
val system = ActorSystem("System")
val actor = system.actorOf(Props(new WordCounterActor(args(0))))
implicit val timeout = Timeout(25 seconds)
val future = actor ? StartProcessFileMsg()
future.map { result =>
println("Total number of words " + result)
system.shutdown
}
}
}
@smishmash
Copy link

Thanks for the tutorial! Though it didn't run as expected for me. I would recommend giving the precise invocation that would run this code, along with the scala version etc.

I am running with scala 2.11.8, and I get an NPE in the main method. I see the following in App's documentation:

@deprecatedOverriding( message = "main should not be overridden" , since = "2.11.0" )

I modified the SampleApp class to look like this:

import scala.concurrent.duration._

import akka.actor.{ActorSystem, Props}
import akka.dispatch.ExecutionContexts._
import akka.pattern.ask
import akka.util.Timeout

object Sample extends App {

  implicit val ec = global

  val system = ActorSystem("System")
  val actor = system.actorOf(Props(new WordCounterActor(args(0))))
  implicit val timeout = Timeout(25.seconds)
  val future = actor ? StartProcessFileMsg()
  future.map { result =>
    println("Total number of words " + result)
    system.shutdown
  }
}

It also isn't obvious that the program needs an argument, the file whose words will be counted.

Finally, there's a deprecation warning I haven't yet tracked down:

[warn] The - command is deprecated in favor of onFailure and will be removed in 0.14.0

Again, thanks for the tutorial! Still going over how the program works.

Sunil

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment