Created
June 9, 2020 04:41
-
-
Save vladimirnani/40f5b398ea2f164900cc9a5289cd1be3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from uuid import NAMESPACE_URL, uuid5 | |
from django.test import TransactionTestCase | |
from eventsourcing.application.decorators import applicationpolicy | |
from eventsourcing.application.django import DjangoApplication | |
from eventsourcing.application.process import ProcessApplication | |
from eventsourcing.domain.model.aggregate import BaseAggregateRoot | |
from eventsourcing.system.definition import System | |
from eventsourcing.system.runner import SingleThreadedRunner | |
class CustomerOrder(BaseAggregateRoot): | |
__subclassevents__ = True | |
def __init__(self, created_at, order_id, **kwargs): | |
super(CustomerOrder, self).__init__(**kwargs) | |
self.created_at = created_at | |
self.order_id = order_id | |
class Event(BaseAggregateRoot.Event): | |
pass | |
class Created(Event, BaseAggregateRoot.Created): | |
pass | |
class OperatorOrder(BaseAggregateRoot): | |
__subclassevents__ = True | |
def __init__(self, created_at, order_id, **kwargs): | |
super(OperatorOrder, self).__init__(**kwargs) | |
self.created_at = created_at | |
self.order_id = order_id | |
class Event(BaseAggregateRoot.Event): | |
pass | |
class Created(Event, BaseAggregateRoot.Created): | |
pass | |
def assign_branch(self, branch_id): | |
self.__trigger_event__(OperatorOrder.BranchAssigned, branch_id=branch_id, branch_assigned_at=datetime.now()) | |
class BranchAssigned(Event): | |
@property | |
def branch_assigned_at(self): | |
return self.__dict__["branch_assigned_at"] | |
@property | |
def branch_id(self): | |
return self.__dict__["branch_id"] | |
def mutate(self, order): | |
order.branch_assigned_at = self.branch_assigned_at | |
order.branch_id = self.branch_id | |
class RestaurantOrder(BaseAggregateRoot): | |
__subclassevents__ = True | |
def __init__(self, created_at, **kwargs): | |
super(RestaurantOrder, self).__init__(**kwargs) | |
self.created_at = created_at | |
class Event(BaseAggregateRoot.Event): | |
pass | |
class Created(Event, BaseAggregateRoot.Created): | |
pass | |
class CustomerOrderProcessApp(DjangoApplication, ProcessApplication): | |
persist_event_type = CustomerOrder.Event | |
def create_id(self, order_id): | |
return uuid5(NAMESPACE_URL, f"order/customer_order_process/{order_id}") | |
@applicationpolicy | |
def policy(self, repository, event): | |
"""Do nothing by default.""" | |
def get_order(self, order_id): | |
_id = self.create_id(order_id) | |
order = self.repository[_id] | |
return order | |
def create_order(self, order_id, created_at=None): | |
_id = self.create_id(order_id) | |
order = CustomerOrder.__create__(originator_id=_id, order_id=order_id, created_at=created_at) | |
assert isinstance(order, CustomerOrder) | |
self.save(order) | |
class OperatorOrderProcessApp(DjangoApplication, ProcessApplication): | |
persist_event_type = OperatorOrder.Event | |
_app_id_prefix = "order/operator_order/" | |
@applicationpolicy | |
def policy(self, repository, event): | |
"""Do nothing by default.""" | |
@policy.register(CustomerOrder.Created) | |
def _(self, repository, event): | |
order = self.create_order(order_id=event.order_id, created_at=event.created_at) | |
return order | |
def get_order(self, order_id) -> OperatorOrder: | |
_id = self.create_id(order_id) | |
order = self.repository[_id] | |
assert isinstance(order, OperatorOrder) | |
return order | |
def create_id(self, order_id): | |
_id = uuid5(NAMESPACE_URL, f"{self._app_id_prefix}{order_id}") | |
return _id | |
def create_order(self, order_id, created_at=None): | |
_id = self.create_id(order_id) | |
order = OperatorOrder.__create__( | |
originator_id=_id, | |
order_id=order_id, | |
created_at=created_at) | |
assert isinstance(order, OperatorOrder) | |
return order | |
def assign_branch(self, order_id, branch_id): | |
order = self.get_order(order_id) | |
order.assign_branch(branch_id=branch_id) | |
order.__save__() | |
class RestaurantOrderProcessApp(DjangoApplication, ProcessApplication): | |
persist_event_type = RestaurantOrder.Event | |
_app_id_prefix = "order/restaurant_order/" | |
@applicationpolicy | |
def policy(self, repository, event): | |
"""Do nothing by default.""" | |
def get_order(self, order_id): | |
_id = uuid5(NAMESPACE_URL, f"{self._app_id_prefix}{order_id}") | |
order = self.repository[_id] | |
return order | |
def get_order_by_originator(self, originator_id): | |
order = self.repository[originator_id] | |
return order | |
@policy.register(OperatorOrder.BranchAssigned) | |
def _(self, repository, event): | |
operator_order = repository[event.originator_id] | |
return self.create_order( | |
order_id=operator_order.order_id, | |
branch_id=event.branch_id, | |
created_at=event.created_at | |
) | |
def create_order(self, order_id, branch_id, created_at=None): | |
_id = uuid5(NAMESPACE_URL, f"{self._app_id_prefix}{order_id}") | |
order = RestaurantOrder.__create__(originator_id=_id, created_at=created_at, | |
branch_id=branch_id, order_id=order_id) | |
assert isinstance(order, RestaurantOrder) | |
return order | |
class SystemTestCase(TransactionTestCase): | |
def test_system(self): | |
system = System( | |
CustomerOrderProcessApp | OperatorOrderProcessApp | RestaurantOrderProcessApp, | |
) | |
runner = SingleThreadedRunner(system) | |
runner.start() | |
order_id = "2" | |
customer_order_app = runner.get(CustomerOrderProcessApp) | |
customer_order_app.create_order(order_id, created_at=datetime.now()) | |
operator_order_app = runner.get(OperatorOrderProcessApp) | |
operator_order_app.assign_branch(order_id=order_id, branch_id="322") | |
runner.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment