Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example of Consistent Hashing for Akka groupBy Streams

ConsistentHashGroupedStreams

What is this?

This is an example of using Akka Streams (2.4.9) to create a split stream (stream of streams) using a similar concept to Akka's ConsistentHashingRouter. The use case I was attempting to simulate was polling messages off of an AWS SQS queue (using the franz API from kifi) and processing them on a set of streams where message destinations are determined by consistent hashing. To simulate processing, IO is faked.

Why split the streams?

In a current project, I wanted to leverage parallelism for performance, so having multiple streams made sense. However, I needed to ensure that messages that were about things (users, accounts, etc.) happened in the order of their presence in the stream. To do this, I needed to guarantee that messages were always directed to the same downstream for processing. This was done using the groupBy function in conjunction with Akka's ConsistentHash object class.

name := "ConsistentHashGroupedStreams"
version := "1.0"
scalaVersion := "2.11.8"
lazy val root = (project in file(".")).enablePlugins(PlayScala)
// add: resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/"
// and: addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.3")
// in project/plugins.sbt
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.4.9",
"com.typesafe.akka" %% "akka-stream" % "2.4.9",
"com.kifi" %% "franz" % "0.3.14"
)
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{Executors, TimeUnit}
import akka.actor.{ActorSystem, Props}
import akka.routing.ConsistentHash
import akka.stream.actor._
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, ClosedShape, ThrottleMode}
import com.kifi.franz.{MessageId, SQSMessage}
import play.api.libs.json._
import ConsistentHashGroupedStreams.ProcessedMessage
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
object SlowIO {
implicit val ec = scala.concurrent.ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
def apply(time: FiniteDuration) = {
Await.result(
Future{
Thread.sleep(time.toMillis)
42
}, time * 10)
}
}
object ConsistentHashGroupedStreams extends App {
implicit val actorSystem = ActorSystem("test")
implicit val materializer = ActorMaterializer()
type Message = SQSMessage[JsValue]
type ProcessedMessage = (SQSMessage[JsValue], Boolean)
def sqsSource = Source.actorPublisher[Message](Props[SqsReader])
.initialDelay(1 second)
.throttle(350, 1 second, 350, ThrottleMode.shaping)
def processMessage = Flow[SQSMessage[JsValue]].map(m => {
m.id.id.toInt match {
case id if id % 42 == 0 =>
SlowIO(800 millis)
(m, false) // every 42nd process simulates a failure
case id =>
SlowIO(400 millis)
(m, true)
}
})
def checkResult = Sink.foreach[ProcessedMessage]({
case (m, true) =>
println(s"Message ${m.id.id} succeeded")
case (m, false) =>
// printing latency to see worst case of messages staying in the stream
println(s"latency -> ${FiniteDuration(System.nanoTime() - m.attributes("created").toLong, TimeUnit.NANOSECONDS).toSeconds}")
println(s"Message ${m.id.id} failed, should be retried")
// could put retry logic here...
})
val ring = ConsistentHash(Range(0, 128), 1)
def splitProcessing =
Flow[SQSMessage[JsValue]].groupBy(128, (m) => ring.nodeFor(m.id.id))
.via(processMessage.async)
.to(checkResult)
// or use .to(Sink.actorSubscriber(Props[SqsProcessor]))
val graph = RunnableGraph.fromGraph(GraphDSL.create(){ implicit builder =>
import GraphDSL.Implicits._
val inlet = builder.add(sqsSource)
val outlet = builder.add(splitProcessing)
inlet ~> outlet
ClosedShape
})
graph.run()
// or do sqsSource.runWith(splitProcessing)
}
class SqsReader extends ActorPublisher[SQSMessage[_ <: JsValue]] {
import akka.stream.actor.ActorPublisherMessage._
val count = new AtomicInteger(0)
val start = System.nanoTime()
def getSqsBatch(size: Long) = {
// do SQS client stuff here
(1L to size).map(i => {
SQSMessage(
MessageId(count.incrementAndGet().toString), Json.obj(), () => Unit, setVisibilityTimeout => Unit, Map("created" -> System.nanoTime().toString), Map()
)
})
}
override def receive = {
case Request(n) =>
getSqsBatch(n).foreach(e => onNext(e))
println(s"sent $n SQS messages")
}
}
class SqsProcessor extends ActorSubscriber {
import akka.stream.actor.ActorSubscriberMessage._
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
override def receive = {
case OnNext(pm: ProcessedMessage) => pm match {
case (m, true) =>
println(s"Message ${m.id.id} succeeded")
case (m, false) =>
// printing latency to see worst case of messages staying in the stream
println(s"latency -> ${FiniteDuration(System.nanoTime() - m.attributes("created").toLong, TimeUnit.NANOSECONDS).toSeconds}")
println(s"Message ${m.id.id} failed, should be retried")
// could put retry logic here...
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.