Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save mariobittencourt/53f98d9034a92eb29a81fc2942b72dfc to your computer and use it in GitHub Desktop.
Save mariobittencourt/53f98d9034a92eb29a81fc2942b72dfc to your computer and use it in GitHub Desktop.
Event Store Payment Repository : find by id
class EventStorePaymentRepository(PaymentRepository):
# ... existing code
async def find_by_id(self, payment_id: str) -> Optional[Payment]:
await self._check_connection()
stream_name = self._create_stream_name(payment_id)
payment = Payment(payment_id)
async for event in self._client.iter(stream=stream_name, from_event=0):
await self._reconstruct_from_event(event, payment)
# I clear the events so changes won't multiply the events themselves
payment.clear_events()
return payment
async def _reconstruct_from_event(self, event, payment):
domain_event = await self.convert_to_domain_event(event)
payment.apply(domain_event)
@staticmethod
async def convert_to_domain_event(event) -> DomainEvent:
# simple implementation
event_types = {
'PaymentCreated': PaymentCreated,
'PaymentAuthorized': PaymentAuthorized,
'PaymentSettled': PaymentSettled,
'PaymentRefunded': PaymentRefunded,
'PaymentDeclined': PaymentDeclined
}
# this event is the one retrieved from EventStore so contains other metadata.
# We just care about the json body
converted = event.json()
return event_types[event.type](**converted)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment