Skip to content

Instantly share code, notes, and snippets.

@giuliocalzolari
Last active January 27, 2021 18:25
Show Gist options
  • Save giuliocalzolari/898ba635d8425960bdb0a688bd1f6425 to your computer and use it in GitHub Desktop.
Save giuliocalzolari/898ba635d8425960bdb0a688bd1f6425 to your computer and use it in GitHub Desktop.
S3 Logs to Elasticsearch
##################################################
### Elasticsearch host name
ES_HOST = "search-******************.ap-northeast-1.es.amazonaws.com"
### Elasticsearch prefix for index name
INDEX_PREFIX = "elb_log"
### ELB name for type name
ELB_NAME = "*****"
### Enabled to change timezone. If you set UTC, this parameter is blank
TIMEZONE = "Asia/Tokyo"
#################################################
### ELB access log format keys
ELB_KEYS = ["@timestamp", "elb", "client_ip", "client_port", "backend_ip", "backend_port", "request_processing_time", "backend_processing_time", "response_processing_time", "elb_status_code", "backend_status_code", "received_bytes", "sent_bytes", "request_method", "request_url", "request_version", "user_agent"]
### ELB access log format regex
ELB_REGEX = '^(.[^ ]+) (.[^ ]+) (.[^ ]+):(\\d+) (.[^ ]+):(\\d+) (.[^ ]+) (.[^ ]+) (.[^ ]+) (.[^ ]+) (.[^ ]+) (\\d+) (\\d+) \"(\\w+) (.[^ ]+) (.[^ ]+)\" \"(.+)\"'
#################################################
import boto3
import re
import os
import json
from datetime import datetime
from dateutil import parser, tz, zoneinfo
from botocore.awsrequest import AWSRequest
from botocore.auth import SigV4Auth
from botocore.endpoint import PreserveAuthSession
from botocore.credentials import Credentials
R = re.compile(ELB_REGEX)
INDEX = INDEX_PREFIX + "-" + datetime.strftime(datetime.now(), "%Y%m%d")
TYPE = ELB_NAME
def lambda_handler(event, context):
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
s3 = boto3.client("s3")
obj = s3.get_object(
Bucket=bucket,
Key=key
)
body = obj["Body"].read()
data = ""
for line in body.strip().split("\n"):
match = R.match(line)
if not match:
continue
values = match.groups(0)
doc = dict(zip(ELB_KEYS, values))
if TIMEZONE:
doc[ELB_KEYS[0]] = parser.parse(doc[ELB_KEYS[0]]).replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(TIMEZONE)).isoformat()
data += '{"index":{"_index":"' + INDEX + '","_type":"' + TYPE + '"}}\n'
data += json.dumps(doc) + "\n"
if len(data) > 1000000:
_bulk(ES_HOST, data)
data = ""
if data:
print data
_bulk(ES_HOST, data)
def _bulk(host, doc):
credentials = _get_credentials()
url = _create_url(host, "/_bulk")
response = es_request(url, "POST", credentials, data=doc)
if not response.ok:
print response.text
def _get_credentials():
return Credentials(
os.environ["AWS_ACCESS_KEY_ID"],
os.environ["AWS_SECRET_ACCESS_KEY"],
os.environ["AWS_SESSION_TOKEN"])
def _create_url(host, path, ssl=False):
if not path.startswith("/"):
path = "/" + path
if ssl:
return "https://" + host + path
else:
return "http://" + host + path
def request(url, method, credentials, service_name, region=None, headers=None, data=None):
if not region:
region = os.environ["AWS_REGION"]
aws_request = AWSRequest(url=url, method=method, headers=headers, data=data)
SigV4Auth(credentials, service_name, region).add_auth(aws_request)
return PreserveAuthSession().send(aws_request.prepare())
def es_request(url, method, credentials, region=None, headers=None, data=None):
return request(url, method, credentials, "es", region, headers, data)
##################################################
### Elasticsearch host name
ES_HOST = "search-******************.ap-northeast-1.es.amazonaws.com"
### Elasticsearch prefix for index name
INDEX_PREFIX = "awslogs"
#################################################
### ELB access log format keys
ELB_KEYS = ["timestamp", "elb", "client_ip", "client_port", "backend_ip", "backend_port", "request_processing_time", "backend_processing_time", "response_processing_time", "elb_status_code", "backend_status_code", "received_bytes", "sent_bytes", "request_method", "request_url", "request_version", "user_agent"]
### ELB access log format regex
ELB_REGEX = '^(.[^ ]+) (.[^ ]+) (.[^ ]+):(\\d+) (.[^ ]+):(\\d+) (.[^ ]+) (.[^ ]+) (.[^ ]+) (.[^ ]+) (.[^ ]+) (\\d+) (\\d+) \"(\\w+) (.[^ ]+) (.[^ ]+)\" \"(.+)\"'
#################################################
import boto3
import re
from datetime import datetime
from dateutil import parser, tz, zoneinfo
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from time import time
R = re.compile(ELB_REGEX)
INDEX = INDEX_PREFIX + "-" + datetime.strftime(datetime.now(), "%Y%m%d")
URL = "http://" + ES_HOST + "/_bulk"
def lambda_handler(event, context):
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
s3 = boto3.client("s3")
obj = s3.get_object(
Bucket=bucket,
Key=key
)
body = obj["Body"].read()
es = Elasticsearch(host=ES_HOST, port=80)
actions = []
elb_name = ""
for line in body.strip().split("\n"):
match = R.match(line)
if not match:
continue
values = match.groups(0)
if not elb_name:
elb_name = values[1]
doc = dict(zip(ELB_KEYS, values))
actions.append({"_index": INDEX, "_type": elb_name, "_source": doc})
if len(actions) > 1000:
helpers.bulk(es, actions)
actions = []
if len(actions) > 0:
helpers.bulk(es, actions)
@khoi-thinh
Copy link

It didn't work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment