Skip to content

Instantly share code, notes, and snippets.

@Unit03 Unit03/aggregate.py
Last active Oct 8, 2019

Embed
What would you like to do?
Event Sourcing - aggregate code sample
class MyAggregate:
@classmethod
def create(cls, ...) -> "MyAggregate":
"""For making a new aggregate. Should issue a "...Created" event."""
...
def do_stuff(self) -> None:
"""Command handler - for executing actions on the aggregate."""
...
def apply_event(self, event: Event) -> None:
"""Event handler - for applying an event onto the aggregate."""
...
def pop_pending_events(self) -> List[Event]:
"""Protocol for persisting in the related repository.
See repository.py.
"""
return self._pending_events
def test_my_agg_do_stuff():
aggregate = MyAggregate()
aggregate.do_stuff()
assert aggregate._pending_events == ...
def test_my_agg_stuff_done():
aggregate = MyAggregate()
event = MyEvent(1)
aggregate.apply_event(event)
assert agg....
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.