Skip to content

Instantly share code, notes, and snippets.



Last active Oct 8, 2019
What would you like to do?
Event Sourcing - aggregate code sample
class MyAggregate:
def create(cls, ...) -> "MyAggregate":
"""For making a new aggregate. Should issue a "...Created" event."""
def do_stuff(self) -> None:
"""Command handler - for executing actions on the aggregate."""
def apply_event(self, event: Event) -> None:
"""Event handler - for applying an event onto the aggregate."""
def pop_pending_events(self) -> List[Event]:
"""Protocol for persisting in the related repository.
return self._pending_events
def test_my_agg_do_stuff():
aggregate = MyAggregate()
assert aggregate._pending_events == ...
def test_my_agg_stuff_done():
aggregate = MyAggregate()
event = MyEvent(1)
assert agg....
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment