Last active
February 5, 2022 23:33
-
-
Save nsadeh/f13404396b2b4604601a8a2fc4962d90 to your computer and use it in GitHub Desktop.
Subscribing to events from another aggregate
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// defining the parent entity | |
object Parent { | |
sealed trait Command | |
sealed trait Event | |
final case class State(...) | |
val empty: State = ??? | |
def apply(parentId: String) = EventSourcedBehavior( | |
persistenceId = PersistenceId(parentId), | |
emptyState = empty, | |
commandHandler = (state, command) => ???, // what these do is not important | |
eventHandler = (state, event) => ??? | |
) | |
.withTagger(_ => Set(parentId)) // I know there are better ways to distribute this | |
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3)) | |
.onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1)) | |
} | |
// defining the child entity | |
object Child { | |
sealed trait Command | |
final case class SubscribeToParent(parentId: String) extends Command | |
sealed trait Event | |
final case class SubscribedToParent(parentId: String) extends Event | |
final case class State(..., offset: Offset, parentId: Option[String]) | |
val empty: State | |
def apply(childId: String, readJournal: ReadJournal) = Behaviors.setup { ctx => | |
// if I only want to track event after I start subscribing | |
EventSourcedBehavior( | |
persistenceId = PersistenceId(parentId), | |
emptyState = empty, | |
commandHandler = commandHandlerWithProjection(ctx) | |
eventHandler = (state, event) => ??? | |
) | |
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3)) | |
.onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1)) | |
.receiveSignal { | |
case (state, RecoveryCompleted) => state.parentId.forEach { parentId => | |
val projection = Projections.parentProjection(parentId, ctx) | |
ctx.spawn(ProjectionBehavior(projection), projection.projectionId.id) | |
} | |
} | |
// if I want to read event since the beginning of the parent - requires that I manually manage offset | |
EventSourcedBehavior( | |
persistenceId = PersistenceId(parentId), | |
emptyState = empty, | |
commandHandler = commandHandlerNoProjection(ctx) | |
eventHandler = (state, event) => ??? // needs to keep track of offset manually | |
) | |
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3)) | |
.onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1)) | |
.receiveSignal { | |
case (state, RecoveryCompleted) => state.parentId.forEach { parentId => | |
journal.eventsByPersistenceId(parentId).collect { | |
case evt: Parent.EventOfInterest => toChildCommand(evt) | |
} | |
.runForEach(ctx.pipeToSelf) | |
} | |
} | |
} | |
def commandHandlerWithProjection(ctx: ActorContext[Command])(state: State, command: Command) = command match { | |
case SubscribeToParent(parentId) => | |
Effect.persist(SubscribedToParent(parentId)).thenRun { _ => | |
val projection = Projections.parentProjection(parentId, ctx) | |
ctx.spawn(ProjectionBehavior(projection), projection.projectionId.id) | |
} | |
} | |
def commandHandlerNoProjection(ctx: ActorContext[Command], journal: ReadJournal)(state: State, command: Command) = command match { | |
case SubscribeToParent(parentId) => | |
Effect.persist(SubscribedToParent(parentId)).thenRun { _ => | |
journal.eventsByPersistenceId(parentId).collect { | |
case evt: Parent.EventOfInterest => toChildCommand(evt) | |
} | |
.runForEach(ctx.pipeToSelf) | |
} | |
} | |
private def toChildCommand(e: Parent.Event): Child.Command = ??? | |
} | |
// setting up projections | |
object Projections { | |
def parentProjection(parentId: String, ref: EntityRef[Child.Command]) = CassandraProjection | |
.atLeastOnceFlow(projectionId = ProjectionId("parents", parentId), sourceProvider(parentId), handler = pipeToChildActor(ref)) | |
.withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis) | |
private def sourceProvider(parentId: String): SourceProvider[Offset, EventEnvelope[Parent.Event]] = | |
EventSourcedProvider | |
.eventsByTag[Parent.Event](system, readJournalPluginId = CassandraReadJournal.Identifier, tag = parentId) | |
private def pipeToChildActor(context: ActorContext[Child.Command]) = FlowWithContext[EventEnvelope[Parent.Event], ProjectionContext] | |
.map(envelope => envelope.event) | |
.mapAsync { // does mapAsync guarantee delivery? | |
case evt : Parent.EventOfInterest => | |
context.pipeToSelf(toChildCommand(evt)) | |
logger.info("Parent {} emitted event of interest", evt.cartId) | |
Done | |
case otherEvent => | |
logger.debug2("Parent {} changed by {}", otherEvent.parentId, otherEvent) | |
Done | |
} | |
private def toChildCommand(e: Parent.Event): Child.Command = ??? | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment