Skip to content

Instantly share code, notes, and snippets.

Created Mar 6, 2018
What would you like to do?
CloudWatch Log Lambda Subscription to ElasticSearch
from __future__ import print_function
import json
import base64
import zlib
from elasticsearch import Elasticsearch
from elasticsearch.connection import create_ssl_context
from datetime import datetime
def handler(event, context):
logs = json.loads(zlib.decompress(base64.b64decode(event['awslogs']['data']), 16+zlib.MAX_WBITS))
context = create_ssl_context(cafile="ca.crt")
es = Elasticsearch(['my.elasticsearch.cluster'], http_auth=('username', 'password'), scheme="https", port=9200, ssl_context=context)
for log_event in logs['logEvents']:
body = json.loads(str(log_event['message']))
body = {
'message': str(log_event['message'])
body['lambda_function'] = str(logs['logGroup'])
body['timestamp'] = log_event['timestamp']
body['aws_account'] = '12345678901'
es.index(index="cloudwatchlogs-" + str("%Y.%m.%d")), doc_type="mymapping", id=log_event['id'], body=body)
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment