Skip to content

Instantly share code, notes, and snippets.

@jiulongw
Created July 25, 2019 23:39
Show Gist options
  • Save jiulongw/003cc7a32bdc91f5a2932fe1d8b18d64 to your computer and use it in GitHub Desktop.
Save jiulongw/003cc7a32bdc91f5a2932fe1d8b18d64 to your computer and use it in GitHub Desktop.
Dump records from DynamoDB stream
#!/usr/bin/env python
import boto3
import os
import json
STREAM_ARN = 'YOUR_DB_STREAM_ARN'
client = boto3.client('dynamodbstreams')
stream = client.describe_stream(StreamArn=STREAM_ARN)
for ss in stream['StreamDescription']['Shards']:
shardId = ss['ShardId']
iter_desc = client.get_shard_iterator(StreamArn=STREAM_ARN, ShardId=shardId, ShardIteratorType='TRIM_HORIZON')
shardIter = iter_desc['ShardIterator']
if os.path.exists(shardId + '.txt'):
continue
print('Getting shard: ' + shardId)
with open(shardId+'.txt', 'w') as f:
while shardIter != None:
print('Getting shard iter: ' + shardIter)
records = client.get_records(ShardIterator=shardIter, Limit=1000)
toSave = json.dumps(records['Records'], indent=2, sort_keys=True, default=str)
f.write(toSave)
shardIter = records.get('NextShardIterator', None)
if len(records['Records']) < 10:
shardIter = None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment