Skip to content

Instantly share code, notes, and snippets.

@purijatin
Created July 20, 2017 09:25
Show Gist options
  • Save purijatin/9ea0437eaf8ae3677e512c298a8ed4a9 to your computer and use it in GitHub Desktop.
Save purijatin/9ea0437eaf8ae3677e512c298a8ed4a9 to your computer and use it in GitHub Desktop.
WordCount.scala
import akka.Done
import akka.actor.Status.Success
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.{Flow, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.Future
object Generic {
def main(args: Array[String]) {
implicit val system = ActorSystem("system")
implicit val mat = ActorMaterializer()
val sink: Sink[Any, Future[Done]] = Sink.foreach(x => println("Ans =====> " + x))
val counts = Flow[String]
.mapConcat(x => x.split("\\s").toList)
.filter(!_.isEmpty)
.groupBy(Int.MaxValue, identity)
.map(x => x -> 1)
.scan(("---", 0)){
case ((d,c), (name, count)) => (name, c + count)
}
.filter(x => ! (x._1 == "---"))
// .reduce((l, r) => (l._1, l._2 + r._2))
.mergeSubstreams
var map = Map[String, Int]()
val global = Sink.foreach((x: (String, Int)) => {
map += x
println("Current Map State: "+map)
})
val words: RunnableGraph[ActorRef] = Source.actorRef(Int.MaxValue, OverflowStrategy.fail)
.via(counts)
.alsoTo(sink)
.to(global)
val ref = words.run()
for {
ln <- scala.io.Source.stdin.getLines.takeWhile(_ != "-1")
} {
ref ! ln
}
ref ! Success("end")
Thread.sleep(5000)
system.terminate()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment