Skip to content

Instantly share code, notes, and snippets.

@mariobittencourt
Last active May 27, 2020 23:29
Show Gist options
  • Save mariobittencourt/e8a2ea59c30e93408762341650aeb8bc to your computer and use it in GitHub Desktop.
Save mariobittencourt/e8a2ea59c30e93408762341650aeb8bc to your computer and use it in GitHub Desktop.
Payment Repository - Event Store
class EventStorePaymentRepository(PaymentRepository):
def __init__(self, username: str, password: str):
self._loop = asyncio.get_event_loop()
# photonpump is a client that interfaces with event store
self._client = photonpump.connect(loop=self._loop, username=username, password=password)
self._connected = False
async def save(self, payment: Payment) -> bool:
await self._check_connection()
stream_name = self._create_stream_name(payment.payment_id)
for event in payment.events:
await self._client.publish_event(stream_name, event.type, event.encode())
# make sure we don't duplicate events
payment.events.clear()
return True
async def _check_connection(self):
if not self._connected:
await self._client.connect()
self._connected = True
def _create_stream_name(self, payment_id) -> str:
return f'payments-{payment_id}'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment