Created
June 17, 2020 02:01
-
-
Save mariobittencourt/520d686e67c729c5b232c9ff06f8dc84 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Projectionist: | |
def __init__(self, projector: PaymentProjector, stream_service: PaymentStreamService, | |
ledger_repository: LedgerRepository, ledger_name: str): | |
self.projector = projector | |
self.stream_service = stream_service | |
self.ledger_repository = ledger_repository | |
self.ledger = None | |
self.ledger_name = ledger_name | |
async def start(self): | |
# load the ledger last position | |
self.ledger = self.ledger_repository.find_by_projection_name(self.ledger_name) | |
# subscribe to the stream from the ledger position | |
if self.ledger.last_position == -1: | |
self.ledger.last_position = 0 | |
else: | |
self.ledger.last_position += 1 | |
await self.stream_service.subscribe(stream='$ce-payments', start_from=self.ledger.last_position, | |
projectionist=self) | |
def handle(self, event: DomainEvent): | |
# pass it to the projector | |
self.projector.apply(event) | |
# if all goes well, acknowledge the ledger to know when to resume next time | |
# in practice, you can decide to batch the updates and you would have to | |
# handle temporary issues to retry | |
self.ledger_repository.update(self.ledger) | |
self.ledger.last_position += 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment