Last active
October 8, 2019 20:55
-
-
Save Unit03/68b3ee0a4741405249043a457fa99f4e to your computer and use it in GitHub Desktop.
Event Sourcing - aggregate code sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyAggregate: | |
@classmethod | |
def create(cls, ...) -> "MyAggregate": | |
"""For making a new aggregate. Should issue a "...Created" event.""" | |
... | |
def do_stuff(self) -> None: | |
"""Command handler - for executing actions on the aggregate.""" | |
... | |
def apply_event(self, event: Event) -> None: | |
"""Event handler - for applying an event onto the aggregate.""" | |
... | |
def pop_pending_events(self) -> List[Event]: | |
"""Protocol for persisting in the related repository. | |
See repository.py. | |
""" | |
return self._pending_events | |
def test_my_agg_do_stuff(): | |
aggregate = MyAggregate() | |
aggregate.do_stuff() | |
assert aggregate._pending_events == ... | |
def test_my_agg_stuff_done(): | |
aggregate = MyAggregate() | |
event = MyEvent(1) | |
aggregate.apply_event(event) | |
assert agg.... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment