Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Created April 3, 2015 15:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Horusiath/bd00d33a130e4add2491 to your computer and use it in GitHub Desktop.
Save Horusiath/bd00d33a130e4add2491 to your computer and use it in GitHub Desktop.
At least once delivery snapshoting desync
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
import akka.persistence.{SnapshotOffer, PersistentActor, AtLeastOnceDelivery}
import akka.actor.{Props, ActorSystem, ActorPath, Actor}
case class Message(data: String)
case class Confirmable(deliveryId: Long, data: String)
case class Confirmation(deliveryId: Long)
case class Snap(snapshot: AtLeastOnceDeliverySnapshot)
class ExampleAtLeastOnceDeliveryActor(val deliveryPath: ActorPath) extends PersistentActor with AtLeastOnceDelivery {
override def persistenceId = "guaranteed-1"
override def receiveRecover: Receive = {
case SnapshotOffer(_, s: Snap) => {
setDeliverySnapshot(s.snapshot)
println("restored undelivered: " + s.snapshot)
}
}
override def receiveCommand: Receive = {
case Message(data) => deliver(deliveryPath, id => {
println(s"sending: $data with deliveryId: $id")
Confirmable(id, data)
})
case Confirmation(deliveryId) => {
confirmDelivery(deliveryId)
}
case "boom" => {
throw new Exception()
}
}
override def postStop = {
val unconfirmed = getDeliverySnapshot
saveSnapshot(Snap(unconfirmed))
println("stored undelivered " + unconfirmed.unconfirmedDeliveries)
super.postStop()
}
}
class DeliveryActor extends Actor {
var confirming = false
override def receive: Actor.Receive = {
case "start" => confirming = true
case "stop" => confirming = false
case Confirmable(deliveryId, data) => {
if(confirming){
println(s"Confirming message id: $deliveryId and data: $data")
context.sender() ! Confirmation(deliveryId)
}else{
println(s"Ignoring message id: $deliveryId and data: $data")
}
}
}
}
object HelloApp extends App {
implicit val system = ActorSystem.create("system")
val delivery = system.actorOf(Props[DeliveryActor](), "delivery")
val deliverer = system.actorOf(Props(classOf[ExampleAtLeastOnceDeliveryActor], delivery.path))
deliverer ! Message("foo")
Thread.sleep(1000)
deliverer ! "boom"
Thread.sleep(1000)
deliverer ! Message("bar")
Thread.sleep(1000)
println("Enabling confirmations")
delivery ! "start"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment