Skip to content

Instantly share code, notes, and snippets.

@JeffreyMFarley
Created July 1, 2024 19:16
Show Gist options
  • Save JeffreyMFarley/9c61d4669cda1e443e4e3adda3cf770b to your computer and use it in GitHub Desktop.
Save JeffreyMFarley/9c61d4669cda1e443e4e3adda3cf770b to your computer and use it in GitHub Desktop.
SNS -> Lambda -> Dynamo DB
import logging
import json
import boto3
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.info('Loading function')
dynamo = boto3.client('dynamodb')
# Lambda handler function
def lambda_handler(event, context):
try:
# Extract message from the SNS event
for record in event['Records']:
message = json.loads(record['Sns']['Message'])
item = {}
for k, v in message.items():
item[k] = {}
if type(v) is float or type(v) is int:
item[k]["N"] = str(v)
else:
item[k]["S"] = v
dynamo.put_item(TableName='mpid-qa', Item=item)
except Exception as e:
print(e)
raise e
return {
'statusCode': 200,
'body': json.dumps('Database updated successfully')
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment