Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Transfer s3 to Elasticsearch log from by Lambda
import boto3
import os
import gzip
from datetime import datetime
from botocore.awsrequest import AWSRequest
from botocore.auth import SigV4Auth
from botocore.endpoint import BotocoreHTTPSession
from botocore.credentials import Credentials
def lambda_handler(event, context):
es_host = os.environ['ES_HOST']
es_index = os.environ['ES_INDEX_PREFIX'] + "-" + datetime.strftime(, "%Y%m%d")
credentials = Credentials(
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
s3 = boto3.resource('s3')
s3.Bucket(bucket).download_file(key, '/tmp/log.gz')
with'/tmp/log.gz') as f:
data = ""
for line in f:
data += '{"index":{"_index":"%s","_type":"log"}}\n' % es_index
data += '{"message":"%s"}\n' % line.strip().replace('"', '\\"')
if len(data) > 3000000:
_bulk(es_host, data, credentials)
data = ""
if data != "":
_bulk(es_host, data, credentials)
return 'Completed'
def _bulk(host, doc, credentials):
pipeline = os.environ['PIPELINE_NAME']
url = 'https://%s/_bulk?pipeline=%s' % (host, pipeline)
response = request(url, "POST", credentials, 'es', data=doc)
if not response.ok:
def request(url, method, credentials, service_name, region=None, headers=None, data=None):
if not region:
region = os.environ["AWS_REGION"]
aws_request = AWSRequest(url=url, method=method, headers=headers, data=data)
SigV4Auth(credentials, service_name, region).add_auth(aws_request)
return BotocoreHTTPSession().send(aws_request.prepare())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment