Skip to content

Instantly share code, notes, and snippets.

@callum-p
Created October 19, 2018 03:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save callum-p/fa8527230e7f566dac1147fcb47a1f0a to your computer and use it in GitHub Desktop.
Save callum-p/fa8527230e7f566dac1147fcb47a1f0a to your computer and use it in GitHub Desktop.
import boto3
import copy
table = "${destTableName}"
def handler(event, context):
// do some assume role stuff into another account here
// client = boto3.client('iam')
// response = client.assume_role(
// RoleArn=arn,
// RoleSessionName=username,
// DurationSeconds=timeout
// )
// boto3.client(
// service_name=service,
// aws_access_key_id=response['Credentials']['AccessKeyId'],
// aws_secret_access_key=response['Credentials']['SecretAccessKey'],
// aws_session_token=response['Credentials']['SessionToken'],
// region_name=region
// )
ddb = boto3.client('dynamodb')
for record in event['Records']:
try:
data = record['dynamodb']
newdata = copy.deepcopy(data)
args = {
'TableName': table,
'Key': data['Keys']
}
if record['eventName'] in ['INSERT', 'MODIFY']:
for k, v in data['NewImage'].items():
newdata['NewImage'][k] = {}
newdata['NewImage'][k]['Action'] = 'PUT'
newdata['NewImage'][k]['Value'] = v
if k in data['Keys']:
del newdata['NewImage'][k]
args['AttributeUpdates'] = newdata['NewImage']
ddb.update_item(**args)
elif record['eventName'] == 'REMOVE':
ddb.delete_item(**args)
except Exception as e:
print(event)
print(f'Exception: {e}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment