Skip to content

Instantly share code, notes, and snippets.

@s-fujimoto
Last active May 9, 2019 19:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s-fujimoto/c97ff3d098d0d280aefcfe2252a7b23e to your computer and use it in GitHub Desktop.
Save s-fujimoto/c97ff3d098d0d280aefcfe2252a7b23e to your computer and use it in GitHub Desktop.
Transfer s3 to Elasticsearch log from by Lambda
import boto3
import os
import gzip
from datetime import datetime
from botocore.awsrequest import AWSRequest
from botocore.auth import SigV4Auth
from botocore.endpoint import BotocoreHTTPSession
from botocore.credentials import Credentials
def lambda_handler(event, context):
print('Started')
es_host = os.environ['ES_HOST']
es_index = os.environ['ES_INDEX_PREFIX'] + "-" + datetime.strftime(datetime.now(), "%Y%m%d")
credentials = Credentials(
os.environ["AWS_ACCESS_KEY_ID"],
os.environ["AWS_SECRET_ACCESS_KEY"],
os.environ["AWS_SESSION_TOKEN"])
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
s3 = boto3.resource('s3')
s3.Bucket(bucket).download_file(key, '/tmp/log.gz')
with gzip.open('/tmp/log.gz') as f:
data = ""
for line in f:
data += '{"index":{"_index":"%s","_type":"log"}}\n' % es_index
data += '{"message":"%s"}\n' % line.strip().replace('"', '\\"')
if len(data) > 3000000:
_bulk(es_host, data, credentials)
data = ""
if data != "":
_bulk(es_host, data, credentials)
return 'Completed'
def _bulk(host, doc, credentials):
pipeline = os.environ['PIPELINE_NAME']
url = 'https://%s/_bulk?pipeline=%s' % (host, pipeline)
response = request(url, "POST", credentials, 'es', data=doc)
if not response.ok:
print(response.text)
def request(url, method, credentials, service_name, region=None, headers=None, data=None):
if not region:
region = os.environ["AWS_REGION"]
aws_request = AWSRequest(url=url, method=method, headers=headers, data=data)
SigV4Auth(credentials, service_name, region).add_auth(aws_request)
return BotocoreHTTPSession().send(aws_request.prepare())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment