Skip to content

Instantly share code, notes, and snippets.

@s-fujimoto
Created December 3, 2015 09:16
Show Gist options
  • Save s-fujimoto/76b5c6e1cd5b4c612561 to your computer and use it in GitHub Desktop.
Save s-fujimoto/76b5c6e1cd5b4c612561 to your computer and use it in GitHub Desktop.
##################################################
### Elasticsearch host name
ES_HOST = "search-******************.ap-northeast-1.es.amazonaws.com"
### Elasticsearch prefix for index name
INDEX_PREFIX = "elb_log"
### ELB name for type name
ELB_NAME = "*****"
### Enabled to change timezone. If you set UTC, this parameter is blank
TIMEZONE = "Asia/Tokyo"
#################################################
### ELB access log format keys
ELB_KEYS = ["@timestamp", "elb", "client_ip", "client_port", "backend_ip", "backend_port", "request_processing_time", "backend_processing_time", "response_processing_time", "elb_status_code", "backend_status_code", "received_bytes", "sent_bytes", "request_method", "request_url", "request_version", "user_agent"]
### ELB access log format regex
ELB_REGEX = '^(.[^ ]+) (.[^ ]+) (.[^ ]+):(\\d+) (.[^ ]+):(\\d+) (.[^ ]+) (.[^ ]+) (.[^ ]+) (.[^ ]+) (.[^ ]+) (\\d+) (\\d+) \"(\\w+) (.[^ ]+) (.[^ ]+)\" \"(.+)\"'
#################################################
import boto3
import re
import os
import json
from datetime import datetime
from dateutil import parser, tz, zoneinfo
from botocore.awsrequest import AWSRequest
from botocore.auth import SigV4Auth
from botocore.endpoint import PreserveAuthSession
from botocore.credentials import Credentials
R = re.compile(ELB_REGEX)
INDEX = INDEX_PREFIX + "-" + datetime.strftime(datetime.now(), "%Y%m%d")
TYPE = ELB_NAME
def lambda_handler(event, context):
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
s3 = boto3.client("s3")
obj = s3.get_object(
Bucket=bucket,
Key=key
)
body = obj["Body"].read()
data = ""
for line in body.strip().split("\n"):
match = R.match(line)
if not match:
continue
values = match.groups(0)
doc = dict(zip(ELB_KEYS, values))
if TIMEZONE:
doc[ELB_KEYS[0]] = parser.parse(doc[ELB_KEYS[0]]).replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(TIMEZONE)).isoformat()
data += '{"index":{"_index":"' + INDEX + '","_type":"' + TYPE + '"}}\n'
data += json.dumps(doc) + "\n"
if len(data) > 1000000:
_bulk(ES_HOST, data)
data = ""
if data:
print data
_bulk(ES_HOST, data)
def _bulk(host, doc):
credentials = _get_credentials()
url = _create_url(host, "/_bulk")
response = es_request(url, "POST", credentials, data=doc)
if not response.ok:
print response.text
def _get_credentials():
return Credentials(
os.environ["AWS_ACCESS_KEY_ID"],
os.environ["AWS_SECRET_ACCESS_KEY"],
os.environ["AWS_SESSION_TOKEN"])
def _create_url(host, path, ssl=False):
if not path.startswith("/"):
path = "/" + path
if ssl:
return "https://" + host + path
else:
return "http://" + host + path
def request(url, method, credentials, service_name, region=None, headers=None, data=None):
if not region:
region = os.environ["AWS_REGION"]
aws_request = AWSRequest(url=url, method=method, headers=headers, data=data)
SigV4Auth(credentials, service_name, region).add_auth(aws_request)
return PreserveAuthSession().send(aws_request.prepare())
def es_request(url, method, credentials, region=None, headers=None, data=None):
return request(url, method, credentials, "es", region, headers, data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment