Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
AtLeastOnceDelivery in Akka Typed

Akka Persistence includes the AtLeastOnceDelivery trait but corresponding has not been implemented for Akka Typed yet. We have ideas and plans for a solution, a better solution than in classic. Probably also including pull based flow control from consumers, and non-persistent alternative.

You can follow progress of that in issue #20984, but it will not be ready for Akka 2.6.0.

In the meantime you can implement this yourself in your own EventSourcedBehavior. A sketch that you can adjust to meet your own needs is provided in this gist.

I might write a blog post about this later.

package docs.akka.persistence.typed
import scala.concurrent.duration.FiniteDuration
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.RetentionCriteria
object AtLeastOnceExample {
object Sender {
sealed trait Command
final case class DeliverMsg(payload: Destination.MsgPayload) extends Command with CborSerializable
private final case class WrappedConfirm(confirm: Destination.Confirm) extends Command
private case object DestinationTerminated extends Command
private case class Redeliver(deliveryId: Long, attempt: Int) extends Command
sealed trait Event extends CborSerializable
final case class MsgSent(payload: Destination.MsgPayload) extends Event
final case class MsgConfirmed(deliveryId: Long) extends Event
// key in the `pending` Map is the deliveryId
// using Map[String, _] due to serialization problem of Map[Long, _]
final case class State(deliveryId: Long, pending: Map[String, Destination.MsgPayload]) extends CborSerializable
def apply(
persistenceId: PersistenceId,
destination: ActorRef[Destination.Command],
redeliverAfter: FiniteDuration): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
context.watchWith(destination, DestinationTerminated)
val confirmAdapter = context.messageAdapter[Destination.Confirm](WrappedConfirm.apply)
EventSourcedBehavior[Command, Event, State](
persistenceId,
emptyState = State(0L, Map.empty),
commandHandler = (state, command) =>
command match {
case DeliverMsg(payload) =>
Effect.persist(MsgSent(payload)).thenRun { newState =>
val deliveryId = newState.deliveryId
context.log.info("Deliver #{} to {}", deliveryId, destination)
destination ! Destination.Msg(deliveryId, payload, confirmAdapter)
timers.startSingleTimer(deliveryId, Redeliver(deliveryId, 2), redeliverAfter)
}
case WrappedConfirm(Destination.Confirm(deliveryId)) =>
context.log.info("Confirmed #{} from {}", deliveryId, destination)
timers.cancel(deliveryId)
Effect.persist(MsgConfirmed(deliveryId))
case Redeliver(deliveryId, attempt) =>
context.log.infoN("Redeliver #{}, attempt {}, to {}", deliveryId, attempt, destination)
val payload = state.pending(deliveryId.toString)
destination ! Destination.Msg(deliveryId, payload, confirmAdapter)
timers.startSingleTimer(deliveryId, Redeliver(deliveryId, attempt + 1), redeliverAfter)
Effect.none
case DestinationTerminated =>
context.log.warn("Destination {} terminated", destination)
Effect.stop()
},
eventHandler = (state, event) =>
event match {
case MsgSent(payload) =>
val nextDeliveryId = state.deliveryId + 1
state.copy(
deliveryId = nextDeliveryId,
pending = state.pending + (nextDeliveryId.toString -> payload))
case MsgConfirmed(deliveryId) =>
state.copy(pending = state.pending - deliveryId.toString)
})
.receiveSignal {
case (state, RecoveryCompleted) =>
state.pending.toList.sortBy { case (deliveryId, _) => deliveryId }.foreach {
case (id, payload) =>
val deliveryId = id.toLong // workaround for CborSerializaton issue of Map[Long, _]
context.log.info("Deliver #{} to {} after recovery", deliveryId, destination)
destination ! Destination.Msg(deliveryId, payload, confirmAdapter)
timers.startSingleTimer(deliveryId, Redeliver(deliveryId, 2), redeliverAfter)
}
}
.withRetention(RetentionCriteria.snapshotEvery(100, 3).withDeleteEventsOnSnapshot)
}
}
}
}
object Destination {
sealed trait Command
final case class Msg(deliveryId: Long, payload: MsgPayload, replyTo: ActorRef[Confirm])
extends Command
with CborSerializable
final case class MsgPayload(s: String)
final case class Confirm(deliveryId: Long) extends CborSerializable
def apply(): Behavior[Command] =
Behaviors.receive { (context, message) =>
message match {
case Msg(deliveryId, payload, replyTo) =>
context.log.info2("Received #{}: {}", deliveryId, payload.s)
replyTo ! Confirm(deliveryId)
Behaviors.same
}
}
}
/**
* Marker trait for serialization with Jackson CBOR
*/
trait CborSerializable
}
package docs.akka.persistence.typed
import java.util.UUID
import scala.concurrent.duration._
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.Behavior
import akka.actor.typed.BehaviorInterceptor
import akka.actor.typed.TypedActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import docs.akka.persistence.typed.AtLeastOnceExample._
import org.scalatest.WordSpecLike
object AtLeastOnceSpec {
val config = ConfigFactory.parseString(s"""
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.journal.inmem.test-serialization = on
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/AtLeastOnceSpec-${UUID.randomUUID().toString}"
akka.actor.serialization-bindings {
"${classOf[AtLeastOnceExample.CborSerializable].getName}" = jackson-cbor
}
""")
}
class AtLeastOnceSpec extends ScalaTestWithActorTestKit(AtLeastOnceSpec.config) with WordSpecLike {
def dropMsg(f: Destination.Msg => Boolean)(
destination: Behavior[Destination.Command]): Behavior[Destination.Command] = {
val interceptor = new BehaviorInterceptor[Destination.Command, Destination.Command] {
override def aroundReceive(
ctx: TypedActorContext[Destination.Command],
msg: Destination.Command,
target: BehaviorInterceptor.ReceiveTarget[Destination.Command]): Behavior[Destination.Command] = {
msg match {
case m: Destination.Msg =>
if (f(m)) {
ctx.asScala.log.info("Dropped #{}", m.deliveryId)
Behaviors.same
} else
target(ctx, msg)
case _ => target(ctx, msg)
}
}
}
Behaviors.intercept(() => interceptor)(destination)
}
"AtLeastOnce with Persistence Typed" must {
"deliver and confirm when no message loss" in {
val destinationProbe = createTestProbe[Destination.Command]()
val destination = spawn(Behaviors.monitor(destinationProbe.ref, Destination()))
val sender = spawn(Sender(PersistenceId("pid1"), destination, 2.seconds))
sender ! Sender.DeliverMsg(Destination.MsgPayload("a"))
val msg1 = destinationProbe.expectMessageType[Destination.Msg]
msg1.deliveryId should ===(1L)
msg1.payload should ===(Destination.MsgPayload("a"))
sender ! Sender.DeliverMsg(Destination.MsgPayload("b"))
val msg2 = destinationProbe.expectMessageType[Destination.Msg]
msg2.deliveryId should ===(2L)
msg2.payload should ===(Destination.MsgPayload("b"))
// no redelivery after confirmation
destinationProbe.expectNoMessage(3.seconds)
testKit.stop(sender)
}
"redeliver lost messages" in {
val outerDestinationProbe = createTestProbe[Destination.Command]()
val innerDestinationProbe = createTestProbe[Destination.Command]()
val destination =
spawn(Behaviors.monitor(outerDestinationProbe.ref, dropMsg(m => m.deliveryId == 3 || m.deliveryId == 4) {
Behaviors.monitor(innerDestinationProbe.ref, Destination())
}))
val sender = spawn(Sender(PersistenceId("pid2"), destination, 2.second))
sender ! Sender.DeliverMsg(Destination.MsgPayload("a"))
sender ! Sender.DeliverMsg(Destination.MsgPayload("b"))
sender ! Sender.DeliverMsg(Destination.MsgPayload("c"))
sender ! Sender.DeliverMsg(Destination.MsgPayload("d"))
sender ! Sender.DeliverMsg(Destination.MsgPayload("e"))
outerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(1L)
innerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(1L)
outerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(2L)
innerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(2L)
outerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(3L)
outerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(4L)
outerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(5L)
// 3 and 4 dropped and not delivered to innerDestinationProbe
innerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(5L)
// redelivered
outerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(3L)
outerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(4L)
innerDestinationProbe.expectNoMessage() // but still dropped
// redelivered again
outerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(3L)
outerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(4L)
innerDestinationProbe.expectNoMessage() // but still dropped
testKit.stop(sender)
// and redelivery should continue after recovery, same pid
val sender2 = spawn(Sender(PersistenceId("pid2"), destination, 2.second))
outerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(3L)
outerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(4L)
innerDestinationProbe.expectNoMessage() // but still dropped
sender2 ! Sender.DeliverMsg(Destination.MsgPayload("f"))
outerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(6L)
innerDestinationProbe.expectMessageType[Destination.Msg].deliveryId should ===(6L)
testKit.stop(sender2)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.