|
|
|
import java.util.concurrent.atomic.AtomicInteger |
|
import java.util.concurrent.{Executors, TimeUnit} |
|
|
|
import akka.actor.{ActorSystem, Props} |
|
import akka.routing.ConsistentHash |
|
import akka.stream.actor._ |
|
import akka.stream.scaladsl.{Flow, GraphDSL, RunnableGraph, Sink, Source} |
|
import akka.stream.{ActorMaterializer, ClosedShape, ThrottleMode} |
|
import com.kifi.franz.{MessageId, SQSMessage} |
|
import play.api.libs.json._ |
|
import ConsistentHashGroupedStreams.ProcessedMessage |
|
|
|
import scala.concurrent.duration._ |
|
import scala.concurrent.{Await, Future} |
|
|
|
object SlowIO { |
|
|
|
implicit val ec = scala.concurrent.ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8)) |
|
|
|
def apply(time: FiniteDuration) = { |
|
Await.result( |
|
Future{ |
|
Thread.sleep(time.toMillis) |
|
42 |
|
}, time * 10) |
|
} |
|
|
|
} |
|
|
|
object ConsistentHashGroupedStreams extends App { |
|
|
|
implicit val actorSystem = ActorSystem("test") |
|
implicit val materializer = ActorMaterializer() |
|
|
|
type Message = SQSMessage[JsValue] |
|
type ProcessedMessage = (SQSMessage[JsValue], Boolean) |
|
|
|
def sqsSource = Source.actorPublisher[Message](Props[SqsReader]) |
|
.initialDelay(1 second) |
|
.throttle(350, 1 second, 350, ThrottleMode.shaping) |
|
|
|
def processMessage = Flow[SQSMessage[JsValue]].map(m => { |
|
m.id.id.toInt match { |
|
case id if id % 42 == 0 => |
|
SlowIO(800 millis) |
|
(m, false) // every 42nd process simulates a failure |
|
case id => |
|
SlowIO(400 millis) |
|
(m, true) |
|
} |
|
}) |
|
|
|
def checkResult = Sink.foreach[ProcessedMessage]({ |
|
case (m, true) => |
|
println(s"Message ${m.id.id} succeeded") |
|
|
|
case (m, false) => |
|
// printing latency to see worst case of messages staying in the stream |
|
println(s"latency -> ${FiniteDuration(System.nanoTime() - m.attributes("created").toLong, TimeUnit.NANOSECONDS).toSeconds}") |
|
println(s"Message ${m.id.id} failed, should be retried") |
|
// could put retry logic here... |
|
}) |
|
|
|
val ring = ConsistentHash(Range(0, 128), 1) |
|
def splitProcessing = |
|
Flow[SQSMessage[JsValue]].groupBy(128, (m) => ring.nodeFor(m.id.id)) |
|
.via(processMessage.async) |
|
.to(checkResult) |
|
// or use .to(Sink.actorSubscriber(Props[SqsProcessor])) |
|
|
|
val graph = RunnableGraph.fromGraph(GraphDSL.create(){ implicit builder => |
|
import GraphDSL.Implicits._ |
|
|
|
val inlet = builder.add(sqsSource) |
|
val outlet = builder.add(splitProcessing) |
|
|
|
inlet ~> outlet |
|
|
|
ClosedShape |
|
}) |
|
|
|
graph.run() |
|
// or do sqsSource.runWith(splitProcessing) |
|
} |
|
|
|
class SqsReader extends ActorPublisher[SQSMessage[_ <: JsValue]] { |
|
|
|
import akka.stream.actor.ActorPublisherMessage._ |
|
|
|
val count = new AtomicInteger(0) |
|
val start = System.nanoTime() |
|
|
|
def getSqsBatch(size: Long) = { |
|
// do SQS client stuff here |
|
(1L to size).map(i => { |
|
SQSMessage( |
|
MessageId(count.incrementAndGet().toString), Json.obj(), () => Unit, setVisibilityTimeout => Unit, Map("created" -> System.nanoTime().toString), Map() |
|
) |
|
}) |
|
} |
|
|
|
override def receive = { |
|
case Request(n) => |
|
getSqsBatch(n).foreach(e => onNext(e)) |
|
println(s"sent $n SQS messages") |
|
} |
|
} |
|
|
|
class SqsProcessor extends ActorSubscriber { |
|
|
|
import akka.stream.actor.ActorSubscriberMessage._ |
|
|
|
override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy |
|
|
|
override def receive = { |
|
case OnNext(pm: ProcessedMessage) => pm match { |
|
case (m, true) => |
|
println(s"Message ${m.id.id} succeeded") |
|
|
|
case (m, false) => |
|
// printing latency to see worst case of messages staying in the stream |
|
println(s"latency -> ${FiniteDuration(System.nanoTime() - m.attributes("created").toLong, TimeUnit.NANOSECONDS).toSeconds}") |
|
println(s"Message ${m.id.id} failed, should be retried") |
|
// could put retry logic here... |
|
} |
|
} |
|
} |