Skip to content

Instantly share code, notes, and snippets.

@ekowcharles
Last active June 3, 2022 00:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ekowcharles/a02abb2d807b3ee84930820c47b8d906 to your computer and use it in GitHub Desktop.
Save ekowcharles/a02abb2d807b3ee84930820c47b8d906 to your computer and use it in GitHub Desktop.
Process ALB Logs
setup:
python3 -m pip install -r requirements.pip
run:
rm -rf logs/*
aws s3 sync s3://<bucket>/AWSLogs/<account-id>/elasticloadbalancing/<region>/<year>/<month>/<day>/ `pwd`/logs
cd logs && gunzip *.log.gz
python3 upload.py
psycopg2-binary
csv
import os
import psycopg2
import csv
# https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html
CONNECTION = psycopg2.connect(
host='host',
database='database',
user='username',
password='password'
)
SPACE = ' '
COLON = ':'
DIRECTORY = 'logs'
def setup_database():
print('Recreating table ...')
sql = """
DROP TABLE IF EXISTS logs;
CREATE TABLE logs
(
ttype varchar(225),
tdatetime varchar(225),
tdate varchar(225),
ttime varchar(225),
tminute varchar(225),
elb varchar(225),
client_ip varchar(225),
client_port varchar(225),
target_ip varchar(225),
target_port varchar(225),
request_processing_time varchar(225),
target_processing_time varchar(225),
response_processing_time varchar(225),
elb_status_code varchar(225),
target_status_code varchar(225),
received_bytes varchar(225),
sent_bytes varchar(225),
request_verb varchar(225),
request_url varchar(225),
request_proto varchar(225),
user_agent varchar(512),
ssl_cipher varchar(225),
ssl_protocol varchar(225),
target_group_arn varchar(225),
trace_id varchar(225),
domain_name varchar(225),
chosen_cert_arn varchar(225),
matched_rule_priority varchar(225),
request_creation_time varchar(225),
actions_executed varchar(225),
redirect_url varchar(225),
lambda_error_reason varchar(225),
target_port_list varchar(225),
target_status_code_list varchar(225),
classification varchar(225),
classification_reason varchar(225)
);
"""
cur = CONNECTION.cursor()
cur.execute(sql)
CONNECTION.commit()
cur.close()
def insert_row(line):
try:
client_ip = client_port = target_ip = target_port = request_verb = request_url = request_proto = tdatetime = tdate = ttime = ''
ttype, tdatetime, elb, client_ip, target_ip, request_processing_time, target_processing_time, response_processing_time, elb_status_code, target_status_code, received_bytes, sent_bytes, request_url, user_agent, ssl_cipher, ssl_protocol, target_group_arn, trace_id, domain_name, chosen_cert_arn, matched_rule_priority, request_creation_time, actions_executed, redirect_url, lambda_error_reason, target_port_list, target_status_code_list, classification, classification_reason = line
if COLON in client_ip:
client_ip, client_port = client_ip.split(COLON)
if COLON in target_ip:
target_ip, target_port = target_ip.split(COLON)
request_verb, request_url, request_proto = request_url.split(SPACE)
tdatetime = tdatetime[:19]
tdate, ttime = tdatetime.split('T')
tminute = ttime[:5]
sql = """INSERT INTO public.logs(ttype, tdatetime, tdate, ttime, tminute, elb, client_ip, client_port, target_ip, target_port, request_processing_time, target_processing_time, response_processing_time, elb_status_code, target_status_code, received_bytes, sent_bytes, request_verb, request_url, request_proto, user_agent, ssl_cipher, ssl_protocol, target_group_arn, trace_id, domain_name, chosen_cert_arn, matched_rule_priority, request_creation_time, actions_executed, redirect_url, lambda_error_reason, target_port_list, target_status_code_list, classification,
classification_reason)
VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""
cur = CONNECTION.cursor()
cur.execute(sql, (ttype, tdatetime, tdate, ttime, tminute, elb, client_ip, client_port, target_ip, target_port, request_processing_time, target_processing_time, response_processing_time, elb_status_code, target_status_code, received_bytes, sent_bytes, request_verb, request_url, request_proto,
user_agent, ssl_cipher, ssl_protocol, target_group_arn, trace_id, domain_name, chosen_cert_arn, matched_rule_priority, request_creation_time, actions_executed, redirect_url, lambda_error_reason, target_port_list, target_status_code_list, classification, classification_reason))
CONNECTION.commit()
cur.close()
except Exception as e:
print(e, line)
def parse_files():
for filename in os.listdir(DIRECTORY):
path = '{}/{}'.format(DIRECTORY, filename)
with open(path, newline='\n') as csvfile:
print('Processing {} ...'.format(path))
try:
reader = csv.reader(csvfile, delimiter=SPACE, quotechar='"')
for line in reader:
insert_row(line)
except Exception as e:
print(e)
if CONNECTION is not None:
try:
setup_database()
parse_files()
except Exception as e:
print(e)
finally:
CONNECTION.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment