Created
January 6, 2018 22:33
-
-
Save numberoverzero/01eef671aeda8c42bf9dcbe3ddc76321 to your computer and use it in GitHub Desktop.
Rough draft fork of bloop.Stream that inspects each record to decide which model to use. Untested.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bloop.models import unpack_from_dynamodb | |
from bloop.signals import object_loaded | |
from bloop.stream.coordinator import Coordinator | |
class TableStream: | |
def __init__(self, *, stream_arn, engine, model_selector): | |
""" | |
model_selector is a function that takes a record | |
and returns a model class. for example: | |
def model_selector(record): | |
if record["new"]["bloop_type"] == {"S": "employee"}: | |
return Employee | |
else: | |
return Manager | |
""" | |
self.engine = engine | |
self.coordinator = Coordinator( | |
session=engine.session, | |
stream_arn=stream_arn) | |
self.model_selector = model_selector | |
def __iter__(self): return self | |
def __next__(self): | |
record = next(self.coordinator) | |
if record: | |
model = self.model_selector(record) | |
meta = model.Meta | |
for key, expected in [ | |
("new", meta.columns), | |
("old", meta.columns), | |
("key", meta.keys)]: | |
if key not in meta.stream["include"]: | |
record[key] = None | |
else: | |
self._unpack(model, record, key, expected) | |
return record | |
def heartbeat(self): self.coordinator.heartbeat() | |
def move_to(self, position): self.coordinator.move_to(position) | |
@property | |
def token(self): return self.coordinator.token | |
def _unpack(self, model, record, key, expected): | |
attrs = record.get(key) | |
if attrs is None: return | |
obj = unpack_from_dynamodb( | |
attrs=attrs, expected=expected, | |
model=model, engine=self.engine) | |
object_loaded.send(self.engine, engine=self.engine, obj=obj) | |
record[key] = obj |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment