Skip to content

Instantly share code, notes, and snippets.

@jkpl
Last active September 10, 2015 10:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jkpl/953d7b96826121223dbd to your computer and use it in GitHub Desktop.
Save jkpl/953d7b96826121223dbd to your computer and use it in GitHub Desktop.
At least once delivery with async persistence
package ainakin.kerran
import akka.actor._
import akka.persistence.{AtLeastOnceDelivery, PersistentActor}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object AinakinKerran {
def start(): Unit = {
val as = ActorSystem("AinakinKerran", ConfigFactory.load("common"))
as.actorOf(Props[AinakinKerran], "root")
}
sealed trait Command
case object StopCmd extends Command
case object StartCmd extends Command
case class MsgCmd(contents: String) extends Command
case class ConfirmableCmd(deliveryId: Long, contents: String) extends Command
case class ConfirmCmd(deliveryId: Long) extends Command
case class StatusChangedCmd(isStarted: Boolean) extends Command
case object TransitionDoneCmd extends Command
sealed trait Event
case object StopEvt extends Event
case object StartEvt extends Event
case class DeliveredEvt(contents: String) extends Event
case class ConfirmedEvt(deliveryId: Long) extends Event
}
class AinakinKerran extends Actor with ActorLogging {
import AinakinKerran._
val sink = context.actorOf(Props[Sink], "sink")
val processor = context.actorOf(Props(new Processor(sink)), "processor")
override def receive: Receive = {
case StatusChangedCmd(true) =>
// These will be delivered to the sink.
1 to 5 foreach { send }
// Stop processor.
processor ! StopCmd
// These wont be delivered to the sink because of the ongoing transition.
6 to 10 foreach { send }
case StatusChangedCmd(false) =>
// These wont be delivered to the sink because we waited for the processor to stop.
11 to 13 foreach { send }
}
private def send(a: Any): Unit = {
processor ! MsgCmd(a.toString)
}
// This wont be delivered to the sink
send("ohai")
// Start processor
processor ! StartCmd
}
class Processor(sink: ActorRef) extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
import AinakinKerran._
override def receiveRecover: Receive = {
case StopEvt => context.become(stopped)
case StartEvt => context.become(running)
case DeliveredEvt(contents) => deliverMsg(contents)
case ConfirmedEvt(deliveryId) => confirmDelivery(deliveryId)
}
override def receiveCommand: Receive = stopped
private def stopped: Receive = {
case StartCmd =>
log.info("Starting!")
transitionEvent(StartEvt) {
sender() ! StatusChangedCmd(isStarted = true)
running
}
case ConfirmCmd(id) => confirmMsg(id)
case MsgCmd(contents) =>
log.info("Received message! Not sending it to the sink! {}", contents)
case m =>
log.warning("Processor got unknown message in stopped state: {}", m)
}
private def running: Receive = {
case StopCmd =>
log.info("Stopping!")
transitionEvent(StopEvt) {
sender() ! StatusChangedCmd(isStarted = false)
stopped
}
case ConfirmCmd(id) => confirmMsg(id)
case MsgCmd(contents) =>
persistAsync(DeliveredEvt(contents)) { _ => }
deliverMsg(contents)
case m =>
log.warning("Processor got unknown message in running state: {}", m)
}
private def transitionEvent(evt: Event)(afterTransition: => Receive): Unit = {
persistAsync(evt) { _ => self forward TransitionDoneCmd } // transition is finished after persist succeeds
context.become(transition(afterTransition))
}
private def transition(afterTransition: => Receive): Receive = {
case TransitionDoneCmd => // transition done => continue with message processing
unstashAll()
context.become(afterTransition)
case _ => stash() // stash until transition is done
}
private def confirmMsg(id: Long): Unit = {
persistAsync(ConfirmedEvt(id)) { evt =>
confirmDelivery(evt.deliveryId)
log.info("Confirmed message: {}", evt.deliveryId)
}
}
private def deliverMsg(contents: String): Unit = {
deliver(sink.path, deliveryId => ConfirmableCmd(deliveryId, contents))
}
}
class Sink extends Actor with ActorLogging {
import AinakinKerran._
import context.dispatcher
override def receive: Receive = {
case ConfirmableCmd(id, contents) =>
log.info("Sink got message {}: {}", id, contents)
val s = sender()
context.system.scheduler.scheduleOnce(100.millis) {
// Slow message handling ;^)
s ! ConfirmCmd(id)
}
case m =>
log.warning("Sink got unknown message: {}", m)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment