Skip to content

Instantly share code, notes, and snippets.

@numberoverzero
Created October 24, 2016 09:36
Show Gist options
  • Save numberoverzero/8f95c5b21c50d2140347b43d63dd1488 to your computer and use it in GitHub Desktop.
Save numberoverzero/8f95c5b21c50d2140347b43d63dd1488 to your computer and use it in GitHub Desktop.
Streaming records with and without bloop
import boto3
dynamodb = boto3.client("dynamodb")
streams = boto3.client("dynamodbstreams")
# The rest of this is a lovecraftian horror I don't want to write right now.
# DynamoDB Stream's documentation only manages a working example without KCL
# by making half a dozen assumptions and forgoing all error handling.
# Records have real User objects, not 4-deep nested dicts.
# Handles shard rollover, expiration, iterator refresh, multiple shards...
import arrow
def process(record):
print("user's email updated to {} {} ".format(
record["new"].email, record["meta"]["created_at"].humanize()))
stream = engine.stream(User, "trim_horizon")
next_heartbeat = arrow.now()
while True:
record = next(stream)
if record: process(record)
if arrow.now() > next_heartbeat:
next_heartbeat = arrow.now().replace(minutes=12)
stream.heartbeat()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment