Skip to content

Instantly share code, notes, and snippets.

@sharathsamala
Last active April 28, 2020 08:01
Show Gist options
  • Save sharathsamala/c2d281598187b0e92410eeb9f2fc81d8 to your computer and use it in GitHub Desktop.
Save sharathsamala/c2d281598187b0e92410eeb9f2fc81d8 to your computer and use it in GitHub Desktop.
Lambda function to load DynamoDB Streams to Elasticsearch
import json
import urllib3
from datetime import datetime, date
from time import struct_time, mktime
import decimal
from boto3.dynamodb.types import TypeDeserializer
URL = "<ES-ENDPOINT>/posts/_doc/{0}"
headers = {'Content-Type': 'application/json'}
class CustomJSONEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
return str(o)
if isinstance(o, date):
return str(o)
if isinstance(o, decimal.Decimal):
return float(o)
if isinstance(o, struct_time):
return datetime.fromtimestamp(mktime(o))
# Any other serializer if needed
return super(CustomJSONEncoder, self).default(o)
def from_dynamodb_to_json(item):
d = TypeDeserializer()
return {k: d.deserialize(value=v) for k, v in item.items()}
def lambda_handler(event, context):
print(event)
output = {}
for x in event["Records"]:
event_type = x['eventName']
input_id = x["dynamodb"]["Keys"]["<UniqueIdKey>"]["S"]
if event_type == "REMOVE":
print("Removing id: " + str(input_id))
http = urllib3.PoolManager()
r = http.request('DELETE', URL.format(input_id), headers=headers)
output = json.loads(r.data.decode('utf-8'))
print(output)
elif event_type == "INSERT":
parsed_event_data = from_dynamodb_to_json(x["dynamodb"]["NewImage"])
print("inserting into index, id: " + str(input_id))
http = urllib3.PoolManager()
encoded_data = json.dumps(parsed_event_data, cls=CustomJSONEncoder).encode('utf-8')
r = http.request('POST', URL.format(input_id), headers=headers, body=encoded_data)
output = json.loads(r.data.decode('utf-8'))
print(output)
else:
print("Invalid event type: "+str(event_type))
return {
'statusCode': 200,
'body': json.dumps(output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment