Skip to content

Instantly share code, notes, and snippets.

@numberoverzero
Created January 6, 2018 22:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save numberoverzero/01eef671aeda8c42bf9dcbe3ddc76321 to your computer and use it in GitHub Desktop.
Save numberoverzero/01eef671aeda8c42bf9dcbe3ddc76321 to your computer and use it in GitHub Desktop.
Rough draft fork of bloop.Stream that inspects each record to decide which model to use. Untested.
from bloop.models import unpack_from_dynamodb
from bloop.signals import object_loaded
from bloop.stream.coordinator import Coordinator
class TableStream:
def __init__(self, *, stream_arn, engine, model_selector):
"""
model_selector is a function that takes a record
and returns a model class. for example:
def model_selector(record):
if record["new"]["bloop_type"] == {"S": "employee"}:
return Employee
else:
return Manager
"""
self.engine = engine
self.coordinator = Coordinator(
session=engine.session,
stream_arn=stream_arn)
self.model_selector = model_selector
def __iter__(self): return self
def __next__(self):
record = next(self.coordinator)
if record:
model = self.model_selector(record)
meta = model.Meta
for key, expected in [
("new", meta.columns),
("old", meta.columns),
("key", meta.keys)]:
if key not in meta.stream["include"]:
record[key] = None
else:
self._unpack(model, record, key, expected)
return record
def heartbeat(self): self.coordinator.heartbeat()
def move_to(self, position): self.coordinator.move_to(position)
@property
def token(self): return self.coordinator.token
def _unpack(self, model, record, key, expected):
attrs = record.get(key)
if attrs is None: return
obj = unpack_from_dynamodb(
attrs=attrs, expected=expected,
model=model, engine=self.engine)
object_loaded.send(self.engine, engine=self.engine, obj=obj)
record[key] = obj
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment