Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save shufo/bf77bef1813e572f6a436e3f99960da3 to your computer and use it in GitHub Desktop.
Save shufo/bf77bef1813e572f6a436e3f99960da3 to your computer and use it in GitHub Desktop.
processing cloudwatch logs to put kinesis firehose
from __future__ import print_function
import base64
import gzip
import StringIO
import json
def lambda_handler(event, context):
output = []
for record in event['records']:
payload = base64.b64decode(record['data'])
fileobj = StringIO.StringIO(payload)
logs = None
with gzip.GzipFile(fileobj=fileobj, mode="r") as f:
logs = f.read()
fileobj.close()
# Do custom processing on the payload here
log_dict = json.loads(logs)
logGroup = log_dict.get("logGroup")
log_list = []
for log_event in log_dict.get("logEvents"):
message = log_event['message']
log_list.append(message)
log_txt = log_list[0] + "\n"
if len(log_list) >= 2:
log_txt = "\n".join(log_list) + "\n"
# Not to GZIP
# fileobj = StringIO.StringIO()
# with gzip.GzipFile(fileobj=fileobj, mode="w") as f:
# f.write(log_txt)
# payload = fileobj.getvalue()
# fileobj.close()
payload = log_txt
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(payload)
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment