from typing import List
class DomainEvent:
pass
class DomainEventStream():
def __init__(self, events: List[DomainEvent]) -> None:
self.events = events
class AggregateRoot:
playhead = 0
events = []
def raises(self, event: DomainEvent) -> None:
self.apply(event)
self.events.append(event)
def apply(self, event: DomainEvent) -> None:
getattr(self, 'apply' + event.__class__.__name__)(event)
self.playhead += 1
def pop_events(self) -> DomainEventStream:
events = DomainEventStream(self.events)
self.events = []
return events
def from_history(self, events: List[DomainEvent]):
for event in events:
self.apply(event)
return self
class EventBus():
def publish(event: DomainEvent):
pass
def subscribe(self):
pass
class EventStore():
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
def save(stream: DomainEventStream) -> None:
""" TODO Persis events"""
""" self.storage.persist(stream) """
for event in stream.events:
self.event_bus.publish(event)
class UserWasCreated(DomainEvent):
def __init__(self, email: str) -> None:
self.email = email
class UserEmailChanged(DomainEvent):
def __init__(self, email: str) -> None:
self.email = email
class UserAdressWasAdded(DomainEvent):
def __init__(self, street: str) -> None:
self.street = street
class User(AggregateRoot):
@staticmethod
def create(email: str) -> 'User':
instance = User()
instance.raises(UserWasCreated(email))
return instance
def change_email(self, email: str) -> 'User':
self.raises(UserEmailChanged(email))
return self
def add_address(self, street: str) -> 'User':
self.raises(UserAdressWasAdded(street))
return self
def applyUserWasCreated(self, event: UserWasCreated) -> None:
self.email = event.email
def applyUserEmailChanged(self, event: UserEmailChanged) -> None:
self.email = event.email
def applyUserAdressWasAdded(self, event: UserAdressWasAdded) -> None:
self.street = event.street
def __eq__(self, other) -> bool:
return self.__dict__ == other.__dict__
def print_events(stream: DomainEventStream):
for event in stream.events:
print(event.__class__.__name__)
user = User.create('Paco@on.com').change_email('loly@dot.com')
print('User Created:')
print('_____________')
print(user.email)
print(user.playhead)
print()
print('User Events fired')
print('_____________')
user_events = user.pop_events()
print_events(user_events)
print()
print('A Clone from history:')
print('_____________')
other = User().from_history([UserWasCreated('Pavo@asd.asd')])
print(other.email)
print(other.playhead)
print()
print('New User from user')
print('_____________')
guy = User().from_history(user_events.events)
print(guy.email)
print(guy.playhead)
print('__ Not same instance __')
print(user == other)
Environment
docker run -it -v $PWD:/app -w '/app' python:3.6.4-alpine3.7 sh -l