Skip to content

Instantly share code, notes, and snippets.

@nsadeh
Last active February 5, 2022 23:33
Show Gist options
  • Save nsadeh/f13404396b2b4604601a8a2fc4962d90 to your computer and use it in GitHub Desktop.
Save nsadeh/f13404396b2b4604601a8a2fc4962d90 to your computer and use it in GitHub Desktop.
Subscribing to events from another aggregate
// defining the parent entity
object Parent {
sealed trait Command
sealed trait Event
final case class State(...)
val empty: State = ???
def apply(parentId: String) = EventSourcedBehavior(
persistenceId = PersistenceId(parentId),
emptyState = empty,
commandHandler = (state, command) => ???, // what these do is not important
eventHandler = (state, event) => ???
)
.withTagger(_ => Set(parentId)) // I know there are better ways to distribute this
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3))
.onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))
}
// defining the child entity
object Child {
sealed trait Command
final case class SubscribeToParent(parentId: String) extends Command
sealed trait Event
final case class SubscribedToParent(parentId: String) extends Event
final case class State(..., offset: Offset, parentId: Option[String])
val empty: State
def apply(childId: String, readJournal: ReadJournal) = Behaviors.setup { ctx =>
// if I only want to track event after I start subscribing
EventSourcedBehavior(
persistenceId = PersistenceId(parentId),
emptyState = empty,
commandHandler = commandHandlerWithProjection(ctx)
eventHandler = (state, event) => ???
)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3))
.onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))
.receiveSignal {
case (state, RecoveryCompleted) => state.parentId.forEach { parentId =>
val projection = Projections.parentProjection(parentId, ctx)
ctx.spawn(ProjectionBehavior(projection), projection.projectionId.id)
}
}
// if I want to read event since the beginning of the parent - requires that I manually manage offset
EventSourcedBehavior(
persistenceId = PersistenceId(parentId),
emptyState = empty,
commandHandler = commandHandlerNoProjection(ctx)
eventHandler = (state, event) => ??? // needs to keep track of offset manually
)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3))
.onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))
.receiveSignal {
case (state, RecoveryCompleted) => state.parentId.forEach { parentId =>
journal.eventsByPersistenceId(parentId).collect {
case evt: Parent.EventOfInterest => toChildCommand(evt)
}
.runForEach(ctx.pipeToSelf)
}
}
}
def commandHandlerWithProjection(ctx: ActorContext[Command])(state: State, command: Command) = command match {
case SubscribeToParent(parentId) =>
Effect.persist(SubscribedToParent(parentId)).thenRun { _ =>
val projection = Projections.parentProjection(parentId, ctx)
ctx.spawn(ProjectionBehavior(projection), projection.projectionId.id)
}
}
def commandHandlerNoProjection(ctx: ActorContext[Command], journal: ReadJournal)(state: State, command: Command) = command match {
case SubscribeToParent(parentId) =>
Effect.persist(SubscribedToParent(parentId)).thenRun { _ =>
journal.eventsByPersistenceId(parentId).collect {
case evt: Parent.EventOfInterest => toChildCommand(evt)
}
.runForEach(ctx.pipeToSelf)
}
}
private def toChildCommand(e: Parent.Event): Child.Command = ???
}
// setting up projections
object Projections {
def parentProjection(parentId: String, ref: EntityRef[Child.Command]) = CassandraProjection
.atLeastOnceFlow(projectionId = ProjectionId("parents", parentId), sourceProvider(parentId), handler = pipeToChildActor(ref))
.withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
private def sourceProvider(parentId: String): SourceProvider[Offset, EventEnvelope[Parent.Event]] =
EventSourcedProvider
.eventsByTag[Parent.Event](system, readJournalPluginId = CassandraReadJournal.Identifier, tag = parentId)
private def pipeToChildActor(context: ActorContext[Child.Command]) = FlowWithContext[EventEnvelope[Parent.Event], ProjectionContext]
.map(envelope => envelope.event)
.mapAsync { // does mapAsync guarantee delivery?
case evt : Parent.EventOfInterest =>
context.pipeToSelf(toChildCommand(evt))
logger.info("Parent {} emitted event of interest", evt.cartId)
Done
case otherEvent =>
logger.debug2("Parent {} changed by {}", otherEvent.parentId, otherEvent)
Done
}
private def toChildCommand(e: Parent.Event): Child.Command = ???
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment