Last active
December 7, 2023 23:15
-
-
Save pajaydev/5845187e24a1641f627d3208ca76c7b3 to your computer and use it in GitHub Desktop.
Script to export Dynamodb records from old table and migrate those records to new dynamodb table.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
SOURCE_TABLE_NAME = "--replace-source-tablename--" | |
DESTINATION_TABLE_NAME = "--replace-destination-tablename--" | |
# export records in tableName.json file | |
OUTPUT_FILE = f"{SOURCE_TABLE}.json" | |
# replace region | |
AWS_REGION = "us-west-2" | |
failed_updates = [] | |
dynamodb = boto3.resource('dynamodb', region_name=AWS_REGION) | |
# export table records from dynamodb into a json file | |
def exportTableRecords(): | |
print("Starting to export table records") | |
table = dynamodb.Table(SOURCE_TABLE_NAME) | |
response = table.scan() | |
data = response['Items'] | |
while 'LastEvaluatedKey' in response: | |
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey']) | |
data.extend(response['Items']) | |
with open(OUTPUT_FILE,"w+") as f: | |
json_data = json.dumps(data) | |
f.write(json_data) | |
print(f"Table data is exported to ${OUTPUT_FILE}, kindly verify") | |
exportTableRecords() | |
```json | |
// data is exported in this format | |
[{ | |
"company": "ABC", | |
"Address": "San fransisco | |
},{ | |
"company": "DEF", | |
"Address": "Seattle" | |
}] | |
``` | |
# import table records from json file into dynamodb | |
# Refactor the data before inserting into new table | |
def importTableRecords(): | |
print("Starting to import table records") | |
table = dynamodb.Table(DESTINATION_TABLE_NAME) | |
# read the data from json file | |
with open(OUTPUT_FILE, "r") as f: | |
json_data = f.read() | |
items = json.loads(json_data) | |
# insert into dynamo db | |
with table.batch_writer() as batch: | |
for item in items: | |
try: | |
# conditionally put only if organization is not present | |
table.put_item( | |
Item={ | |
"organization": item["company"], | |
"organizationAddress": item["address"] | |
}, | |
ConditionExpression="attribute_not_exists(organization)" | |
) | |
except Exception as e: | |
print(f"!!!Failed to insert because of error : {e}") | |
failed_updates.append(item) | |
if failed_client_channel_arns: | |
print('Failed to update organization: ', failed_updates) | |
importTableRecords() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment