Skip to content

Instantly share code, notes, and snippets.

@sasankmukkamala
Created April 6, 2019 20:51
Show Gist options
  • Save sasankmukkamala/79771a92afe7df9a0c439cb013984800 to your computer and use it in GitHub Desktop.
Save sasankmukkamala/79771a92afe7df9a0c439cb013984800 to your computer and use it in GitHub Desktop.
Send Existing Data from Dynamodb to AWS Lambda via Kinesis by scanning entire table.
import boto3
import boto3.dynamodb.types
import json
# Load the service resources in the desired region.
session = boto3.Session(profile_name='default')
dynamodb = session.resource('dynamodb', region_name="ap-south-1")
kinesis = session.client('kinesis', region_name="ap-south-1")
# Set names for ddb table and ks stream
ddb_table_name = ""
ks_stream_name = ""
table = dynamodb.Table(ddb_table_name)
# Get the primary keys.
ddb_keys_name = [a['AttributeName'] for a in table.attribute_definitions]
# Scan operations are limited to 1 MB at a time.
# Iterate until all records have been scanned.
response = None
while True:
if not response:
# Scan from the start.
response = table.scan()
else:
# Scan from where you stopped previously.
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
for i in response["Items"]:
# Get a dict of primary key(s).
ddb_keys = {k: i[k] for k in i if k in ddb_keys_name}
# Serialize Python Dictionaries into DynamoDB notation.
ddb_data = boto3.dynamodb.types.TypeSerializer().serialize(i)["M"]
ddb_keys = boto3.dynamodb.types.TypeSerializer().serialize(ddb_keys)["M"]
# The record must contain "Keys" and "NewImage" attributes to be similar
# to a DynamoDB Streams record. Additionally, you inject the name of
# the source DynamoDB table.
record = {"Keys": ddb_keys, "NewImage": ddb_data, "SourceTable": ddb_table_name}
# Convert the record to JSON.
record = json.dumps(record)
# Push the record to Amazon Kinesis.
res = kinesis.put_record(
StreamName=ks_stream_name,
Data=record,
PartitionKey=i["awsId"]) # change this accordingly.
print(res)
# Stop the loop if no additional records are
# available.
if 'LastEvaluatedKey' not in response:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment