Created September 6, 2016 05:29
Example of Consistent Hashing for Akka groupBy Streams


What is this?

This is an example of using Akka Streams (2.4.9) to create a split stream (stream of streams) using a similar concept to Akka's ConsistentHashingRouter. The use case I was attempting to simulate was polling messages off of an AWS SQS queue (using the franz API from kifi) and processing them on a set of streams where message destinations are determined by consistent hashing. To simulate processing, IO is faked.

Why split the streams?

In a current project, I wanted to leverage parallelism for performance, so having multiple streams made sense. However, I needed to ensure that messages that were about things (users, accounts, etc.) happened in the order of their presence in the stream. To do this, I needed to guarantee that messages were always directed to the same downstream for processing. This was done using the groupBy function in conjunction with Akka's ConsistentHash object class.

name := "ConsistentHashGroupedStreams"
version := "1.0"
scalaVersion := "2.11.8"
lazy val root = (project in file(".")).enablePlugins(PlayScala)
// add: resolvers += "Typesafe repository" at ""
// and: addSbtPlugin("" % "sbt-plugin" % "2.5.3")
// in project/plugins.sbt
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4.9",
"com.typesafe.akka" %% "akka-stream" % "2.4.9",
"com.kifi" %% "franz" % "0.3.14"
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{Executors, TimeUnit}
import{ActorSystem, Props}
import akka.routing.ConsistentHash
import{Flow, GraphDSL, RunnableGraph, Sink, Source}
import{ActorMaterializer, ClosedShape, ThrottleMode}
import com.kifi.franz.{MessageId, SQSMessage}
import play.api.libs.json._
import ConsistentHashGroupedStreams.ProcessedMessage
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
object SlowIO {
implicit val ec = scala.concurrent.ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
def apply(time: FiniteDuration) = {
}, time * 10)
object ConsistentHashGroupedStreams extends App {
implicit val actorSystem = ActorSystem("test")
implicit val materializer = ActorMaterializer()
type Message = SQSMessage[JsValue]
type ProcessedMessage = (SQSMessage[JsValue], Boolean)
def sqsSource = Source.actorPublisher[Message](Props[SqsReader])
.initialDelay(1 second)
.throttle(350, 1 second, 350, ThrottleMode.shaping)
def processMessage = Flow[SQSMessage[JsValue]].map(m => { match {
case id if id % 42 == 0 =>
SlowIO(800 millis)
(m, false) // every 42nd process simulates a failure
case id =>
SlowIO(400 millis)
(m, true)
def checkResult = Sink.foreach[ProcessedMessage]({
case (m, true) =>
println(s"Message ${} succeeded")
case (m, false) =>
// printing latency to see worst case of messages staying in the stream
println(s"latency -> ${FiniteDuration(System.nanoTime() - m.attributes("created").toLong, TimeUnit.NANOSECONDS).toSeconds}")
println(s"Message ${} failed, should be retried")
// could put retry logic here...
val ring = ConsistentHash(Range(0, 128), 1)
def splitProcessing =
Flow[SQSMessage[JsValue]].groupBy(128, (m) => ring.nodeFor(
// or use .to(Sink.actorSubscriber(Props[SqsProcessor]))
val graph = RunnableGraph.fromGraph(GraphDSL.create(){ implicit builder =>
import GraphDSL.Implicits._
val inlet = builder.add(sqsSource)
val outlet = builder.add(splitProcessing)
inlet ~> outlet
// or do sqsSource.runWith(splitProcessing)
class SqsReader extends ActorPublisher[SQSMessage[_ <: JsValue]] {
val count = new AtomicInteger(0)
val start = System.nanoTime()
def getSqsBatch(size: Long) = {
// do SQS client stuff here
(1L to size).map(i => {
MessageId(count.incrementAndGet().toString), Json.obj(), () => Unit, setVisibilityTimeout => Unit, Map("created" -> System.nanoTime().toString), Map()
override def receive = {
case Request(n) =>
getSqsBatch(n).foreach(e => onNext(e))
println(s"sent $n SQS messages")
class SqsProcessor extends ActorSubscriber {
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
override def receive = {
case OnNext(pm: ProcessedMessage) => pm match {
case (m, true) =>
println(s"Message ${} succeeded")
case (m, false) =>
// printing latency to see worst case of messages staying in the stream
println(s"latency -> ${FiniteDuration(System.nanoTime() - m.attributes("created").toLong, TimeUnit.NANOSECONDS).toSeconds}")
println(s"Message ${} failed, should be retried")
// could put retry logic here...
