Skip to content

Instantly share code, notes, and snippets.

@MartinSeeler
Created August 18, 2015 08:20
Show Gist options
  • Save MartinSeeler/594eedb7a44c5204e120 to your computer and use it in GitHub Desktop.
Save MartinSeeler/594eedb7a44c5204e120 to your computer and use it in GitHub Desktop.
Akka stream synchronization
import java.util.concurrent.atomic.{AtomicReference, AtomicInteger, AtomicBoolean}
import akka.actor.Actor.Receive
import akka.actor._
import akka.event.LoggingReceive
import akka.stream.actor.ActorSubscriberMessage.{OnNext, OnError, OnComplete}
import akka.stream.actor._
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.stream.{Outlet, Inlet, Attributes, ActorMaterializer}
import akka.stream.Supervision.Directive
import akka.stream.scaladsl._
import akka.stream.stage._
import org.reactivestreams.{Processor, Publisher, Subscriber}
import scala.collection
import scala.collection.parallel.mutable
object AkkaStreamSync extends App {
implicit val sys = ActorSystem("AkkaStreamSync")
implicit val mat = ActorMaterializer()
val source: Source[Int, Unit] = Source(0 to 100)
val postRef = sys.actorOf(Props[SynchronizerActorPost], "post-ref")
val preRef = sys.actorOf(Props(classOf[SynchronizerActorPre], postRef), "pre-ref")
val preSubscriber: Subscriber[Int] = ActorSubscriber[Int](preRef)
val preProducer: Publisher[Int] = ActorPublisher[Int](preRef)
val postSubscriber: Subscriber[Int] = ActorSubscriber[Int](postRef)
val postProducer: Publisher[Int] = ActorPublisher[Int](postRef)
FlowGraph.closed() { implicit b =>
import akka.stream.scaladsl.FlowGraph.Implicits._
val preIn: Inlet[Int] = b.add(Sink(preSubscriber))
val preOut: Outlet[Int] = b.add(Source(preProducer))
val postIn: Inlet[Int] = b.add(Sink(postSubscriber))
val postOut: Outlet[Int] = b.add(Source(postProducer))
source ~> preIn
preOut ~> Flow[Int].filter(_ % 5 == 0).withAttributes(Attributes.inputBuffer(1, 1)) ~> postIn
postOut ~> Sink.foreach(println).withAttributes(Attributes.inputBuffer(1, 1))
}.run()
}
sealed trait SynchronizerEvent
case class AnnounceNext[A](next: A) extends SynchronizerEvent
case class AckNext[A](next: A) extends SynchronizerEvent
case class AnnounceNotMissing[A](elem: A) extends SynchronizerEvent
case class AnnounceMissing[A](next: A) extends SynchronizerEvent
case class AckMissing[A](next: A) extends SynchronizerEvent
case class RetryMissing[A](next: A) extends SynchronizerEvent
case class RejectMissing[A](next: A) extends SynchronizerEvent
class SynchronizerActorPre(post: ActorRef) extends ActorSubscriber with ActorPublisher[Int]
with ActorLogging with Stash {
protected def requestStrategy: RequestStrategy = ZeroRequestStrategy
def receive: Receive = waitForNext
def waitForNext: Receive = LoggingReceive.withLabel("waitForNext") {
case Request(n) => request(1)
case OnNext(x: Int) =>
context become announceNext(x)
post ! AnnounceNext(x)
}
def announceNext(elem: Int): Receive = LoggingReceive.withLabel("announceNext") {
case AckNext(x: Int) if x == elem =>
context become awaitReqOrAck(elem)
onNext(elem)
}
def awaitReqOrAck(elem: Int): Receive = LoggingReceive.withLabel("awaitReqOrAck") {
case Request(n) =>
context become awaitAckMissing(elem)
post ! AnnounceMissing(elem)
}
def awaitAckMissing(elem: Int): Receive = LoggingReceive.withLabel("awaitAckMissing") {
case OnComplete => onComplete()
case RetryMissing(x: Int) if x == elem =>
post ! AnnounceMissing(elem)
case AckMissing(x: Int) if x == elem =>
context become waitForNext
request(1)
case RejectMissing(x: Int) if x == elem =>
context become waitForNext
request(1)
}
}
class SynchronizerActorPost extends ActorSubscriber with ActorPublisher[(Int, Option[Int])] with ActorLogging {
protected def requestStrategy: RequestStrategy = ZeroRequestStrategy
val queue = collection.mutable.Queue[(Int, Option[Int])]()
def receive: Receive = waitForNext
request(1)
def sendQueue() = while(isActive && totalDemand > 0 && queue.nonEmpty) {
onNext(queue.dequeue())
}
def waitForNext: Receive = LoggingReceive.withLabel("waitForNext") {
case OnComplete =>
sendQueue()
onCompleteThenStop()
case OnNext(n: Int) =>
onErrorThenStop(new Exception(s"Received element $n after Ack"))
case AnnounceNext(x: Int) =>
sendQueue()
context become awaitNextOrMissing(x, sender)
sender ! AckNext(x)
}
def awaitNextOrMissing(elem: Int, ref: ActorRef, round: Int = 1): Receive = LoggingReceive.withLabel("awaitNextOrMissing") {
case OnComplete => onComplete()
case OnNext(n: Int) =>
context become awaitNotMissingAck(elem)
queue.enqueue((elem, Some(n)))
ref ! AnnounceNotMissing(elem)
case AnnounceMissing(x: Int) if x == elem =>
if (round == 5) {
queue.enqueue((elem, None))
if (isCompleted) {
sendQueue()
context.stop(self)
} else {
request(1)
context become waitForNext
}
sender ! AckMissing(elem)
} else {
context become awaitNextOrMissing(elem, ref, round + 1)
sender ! RetryMissing(elem)
}
}
def awaitNotMissingAck(elem: Int): Receive = LoggingReceive.withLabel("awaitNotMissingAck") {
case AnnounceMissing(x: Int) if x == elem =>
request(1)
context become waitForNext
sender ! RejectMissing(elem)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment