-
-
Save trmsmy/9fba676fd122241528a47cd01c3c7e3b to your computer and use it in GitHub Desktop.
Send Existing Data from Dynamodb to AWS Lambda via Kinesis by scanning entire table.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import boto3.dynamodb.types | |
import json | |
# Load the service resources in the desired region. | |
session = boto3.Session(profile_name='default') | |
dynamodb = session.resource('dynamodb', region_name="ap-south-1") | |
kinesis = session.client('kinesis', region_name="ap-south-1") | |
# Set names for ddb table and ks stream | |
ddb_table_name = "" | |
ks_stream_name = "" | |
table = dynamodb.Table(ddb_table_name) | |
# Get the primary keys. | |
ddb_keys_name = [a['AttributeName'] for a in table.attribute_definitions] | |
# Scan operations are limited to 1 MB at a time. | |
# Iterate until all records have been scanned. | |
response = None | |
while True: | |
if not response: | |
# Scan from the start. | |
response = table.scan() | |
else: | |
# Scan from where you stopped previously. | |
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey']) | |
for i in response["Items"]: | |
# Get a dict of primary key(s). | |
ddb_keys = {k: i[k] for k in i if k in ddb_keys_name} | |
# Serialize Python Dictionaries into DynamoDB notation. | |
ddb_data = boto3.dynamodb.types.TypeSerializer().serialize(i)["M"] | |
ddb_keys = boto3.dynamodb.types.TypeSerializer().serialize(ddb_keys)["M"] | |
# The record must contain "Keys" and "NewImage" attributes to be similar | |
# to a DynamoDB Streams record. Additionally, you inject the name of | |
# the source DynamoDB table. | |
record = {"Keys": ddb_keys, "NewImage": ddb_data, "SourceTable": ddb_table_name} | |
# Convert the record to JSON. | |
record = json.dumps(record) | |
# Push the record to Amazon Kinesis. | |
res = kinesis.put_record( | |
StreamName=ks_stream_name, | |
Data=record, | |
PartitionKey=i["awsId"]) # change this accordingly. | |
print(res) | |
# Stop the loop if no additional records are | |
# available. | |
if 'LastEvaluatedKey' not in response: | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment