Skip to content

Instantly share code, notes, and snippets.

@uchimanajet7
Created April 7, 2017 07:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save uchimanajet7/2f7ceb51293e86440213f3e62cea4f9f to your computer and use it in GitHub Desktop.
Save uchimanajet7/2f7ceb51293e86440213f3e62cea4f9f to your computer and use it in GitHub Desktop.
Amazon Kinesis FirehoseのData transformation例
from __future__ import print_function
import base64
import gzip
import StringIO
import json
def lambda_handler(event, context):
output = []
for record in event['records']:
payload = base64.b64decode(record['data'])
fileobj = StringIO.StringIO(payload)
logs = None
with gzip.GzipFile(fileobj=fileobj, mode="r") as f:
logs = f.read()
fileobj.close()
# Do custom processing on the payload here
log_dict = json.loads(logs)
logGroup = log_dict.get("logGroup")
log_list = []
for log_event in log_dict.get("logEvents"):
log_event["logGroup"] = logGroup
log_list.append(json.dumps(log_event))
log_txt = log_list[0] + "\n"
if len(log_list) >= 2:
log_txt = "\n".join(log_list) + "\n"
# Not to GZIP
# fileobj = StringIO.StringIO()
# with gzip.GzipFile(fileobj=fileobj, mode="w") as f:
# f.write(log_txt)
# payload = fileobj.getvalue()
# fileobj.close()
payload = log_txt
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(payload)
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment