Skip to content

Instantly share code, notes, and snippets.

@alexcasalboni
Created February 26, 2019 08:47
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save alexcasalboni/1790987a59c9f9bfe00115c8d6083b0e to your computer and use it in GitHub Desktop.
Amazon Kinesis Data Firehose - AWS Lambda processor
import json
from base64 import b64decode, b64encode
# some useful constants
STATUS_OK = 'Ok'
STATUS_DROPPED = 'Dropped'
STATUS_FAIL = 'ProcessingFailed'
class DroppedRecordException(Exception):
""" This exception can be raised if a record needs to be skipped/dropped """
def lambda_handler(event, context):
""" This is the main Lambda entry point """
return {
'records': map(process_record, event['records']),
}
def process_record(record):
""" Invoked once for each record (raw base64-encoded data) """
data = json.loads(b64decode(record['data']))
try:
new_data = transform_data(data) # manipulate/validate record
record['data'] = b64encode(json.dumps(new_data) + "\n") # re-encode and add newline (for Athena)
except DroppedRecordException:
record['result'] = STATUS_DROPPED # skip
except Exception:
record['result'] = STATUS_FAIL # generic error
else:
record['result'] = STATUS_OK # all good
return record
def transform_data(data):
""" Invoked once for each record """
print("Processing data: %s" % data)
# example: you can skip records
# if 'invalid stuff' in data:
# raise DroppedRecordException()
# example: you can add new fields
# data['new_value'] = True
return data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment