Skip to content

Instantly share code, notes, and snippets.

@Unit03
Last active October 8, 2019 20:55
Show Gist options
  • Save Unit03/68b3ee0a4741405249043a457fa99f4e to your computer and use it in GitHub Desktop.
Save Unit03/68b3ee0a4741405249043a457fa99f4e to your computer and use it in GitHub Desktop.
Event Sourcing - aggregate code sample
class MyAggregate:
@classmethod
def create(cls, ...) -> "MyAggregate":
"""For making a new aggregate. Should issue a "...Created" event."""
...
def do_stuff(self) -> None:
"""Command handler - for executing actions on the aggregate."""
...
def apply_event(self, event: Event) -> None:
"""Event handler - for applying an event onto the aggregate."""
...
def pop_pending_events(self) -> List[Event]:
"""Protocol for persisting in the related repository.
See repository.py.
"""
return self._pending_events
def test_my_agg_do_stuff():
aggregate = MyAggregate()
aggregate.do_stuff()
assert aggregate._pending_events == ...
def test_my_agg_stuff_done():
aggregate = MyAggregate()
event = MyEvent(1)
aggregate.apply_event(event)
assert agg....
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment