Skip to content

Instantly share code, notes, and snippets.

@janjaali
Created May 9, 2021 20:06
Show Gist options
  • Save janjaali/6307613262b3a5c0eb229256331ef044 to your computer and use it in GitHub Desktop.
Save janjaali/6307613262b3a5c0eb229256331ef044 to your computer and use it in GitHub Desktop.
Behavior that fails fast for persistence failures in EventSourcedBehaviors (i.e. JournalFailureException)
package org.janjaali.akka.persistence.typed.behaviors
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior, Terminated}
import akka.pattern.StatusReply
import java.util.UUID
object FailFastEventSourcedBehavior {
def apply(
eventSourcedBehavior: Behavior[Command]
): Behavior[Command] = {
Behaviors.setup { ctx =>
val eventSourcingActor = ctx.spawn(
eventSourcedBehavior,
s"EventSourcedBehavior$$${UUID.randomUUID()}"
)
ctx.watch(eventSourcingActor)
Behaviors.receiveMessage {
case command: Commands.CommandExpectingReply[_] =>
eventSourcingActor ! command
replyFailureFastOnTerminated(command.replyTo)
case command =>
eventSourcingActor ! command
Behaviors.same
}
}
}
private def replyFailureFastOnTerminated[C, R](originalSender: ActorRef[StatusReply[R]]): Behavior[C] = {
Behaviors.receiveSignal {
case (_, Terminated(childBehavior)) =>
originalSender ! StatusReply.Error(new Exception(s"Behavior '$childBehavior' terminated."))
Behaviors.stopped
}
}
/** Represents EventSourcedBehavior protocol definition */
sealed trait Command
object Commands {
final case class CommandExpectingReply[R](
replyTo: ActorRef[StatusReply[R]]
) extends Command
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment