Parse nginx logs to sqlite3
#!/usr/bin/env python | |
# The MIT License (MIT) | |
# Copyright (c) 2016 Michael-Keith Bernard | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
import re | |
import copy | |
import logging | |
import argparse | |
import sqlite3 | |
import datetime | |
import calendar | |
import itertools | |
__author__ = "Michael-Keith Bernard (mkbernard@opentable.com)" | |
def with_match(name, re): | |
return r'(?P<{}>{})'.format(name, re) | |
IP_RE = r"\d+(?:\.\d+){3}" | |
QUOTED_STRING_RE = r'"[^\"]+"' | |
TIMESTAMP_RE = r'\[[^\]]+\]' | |
IP_LIST_RE = r'(?:{ip}(?:\,\s+{ip})*)|-'.format(ip=IP_RE) | |
NUMERAL_RE = r'\d+(?:\.\d+)?' | |
LOG_PARTS = [ | |
with_match('remote_addr', IP_RE), | |
with_match('user', r'- -'), | |
with_match('timestamp', TIMESTAMP_RE), | |
with_match('request', QUOTED_STRING_RE), | |
with_match('response_code', r'\d+'), | |
with_match('response_size', r'\d+'), | |
with_match('referer', QUOTED_STRING_RE), | |
with_match('user_agent', QUOTED_STRING_RE), | |
with_match('forwarded_for', IP_LIST_RE), | |
with_match('nginx_time', NUMERAL_RE), | |
with_match('upstream_time', NUMERAL_RE), | |
with_match('pipelined', r'[.p]'), | |
] | |
LOG_RE = r"^{}$".format("\s+".join(LOG_PARTS)) | |
CREATE_REQUESTS_TABLE = """ | |
create table if not exists requests ( | |
id integer primary key, | |
source text, | |
upstream_time float, | |
nginx_time float, | |
response_size integer, | |
response_code integer, | |
user_agent text, | |
referer text, | |
remote_addr text, | |
forwarded_for text, | |
pipelined boolean, | |
timestamp text, | |
unix_epoch float, | |
method text, | |
path text, | |
query_string text, | |
protocol text | |
); | |
""" | |
def parse_args(args=None, parse=True): | |
"""Parse command-line arguments""" | |
parser = argparse.ArgumentParser( | |
description="Log processor for Graphite Web nginx") | |
parser.add_argument("logfile", | |
help="Path to nginx log file") | |
parser.add_argument("-d" ,"--db", | |
default=":memory:", | |
help="Path to sqlite3 database") | |
parser.add_argument("-s" ,"--source", | |
default="graphite", | |
help="Source of logs (eg which web server)") | |
parser.add_argument("-b" ,"--batch", | |
default=500, | |
type=int, | |
help="Batch size for inserts") | |
res = parser.parse_args(args) if parse else None | |
return parser, res | |
def migrate_db(db): | |
"""Run database migrations (create tables, etc)""" | |
cur = db.cursor() | |
cur.execute(CREATE_REQUESTS_TABLE) | |
db.commit() | |
def setup_db(path, migrations=True): | |
"""Initialize database connection""" | |
db = sqlite3.connect(path) | |
db.row_factory = sqlite3.Row | |
if migrations: | |
migrate_db(db) | |
return db | |
def parse_log(log_line): | |
"""Parse a single log line""" | |
match = re.match(LOG_RE, log_line.strip()) | |
return match.groupdict() if match else None | |
def parse_date(timestamp): | |
"""Parse the nginx time format into datetime""" | |
fmt = '[%d/%b/%Y:%H:%M:%S +0000]' | |
dt = datetime.datetime.strptime(timestamp, fmt) | |
return dt, calendar.timegm(dt.timetuple()) | |
def parse_request(request): | |
"""Parse the request into method, path, query string, and HTTP protocol""" | |
req = request[1:-1] | |
method, rest = req.split(" ", 1) | |
full_path, protocol = rest.rsplit(" ", 1) | |
parts = full_path.split("?", 1) | |
path, qs = parts if len(parts) > 1 else (parts[0], "") | |
return method, path, qs, protocol | |
def normalize_log(parsed): | |
"""Clean up a parsed log data""" | |
n = {} | |
ts, epoch = parse_date(parsed['timestamp']) | |
method, path, qs, protocol = parse_request(parsed['request']) | |
n['upstream_time'] = float(parsed['upstream_time']) | |
n['nginx_time'] = float(parsed['nginx_time']) | |
n['response_size'] = int(parsed['response_size']) | |
n['response_code'] = int(parsed['response_code']) | |
n['user_agent'] = parsed['user_agent'][1:-1] | |
n['referer'] = parsed['referer'][1:-1] | |
n['remote_addr'] = parsed['remote_addr'] | |
n['forwarded_for'] = parsed['forwarded_for'].split(", ") | |
n['pipelined'] = parsed['pipelined'] == 'p' | |
n['timestamp'] = ts | |
n['unix_epoch'] = epoch | |
n['method'] = method | |
n['path'] = path | |
n['query_string'] = qs | |
n['protocol'] = protocol | |
return n | |
def prepare_log(log, **kwargs): | |
"""Prepare a normalized log for database insertion""" | |
p = copy.deepcopy(log) | |
p['timestamp'] = p['timestamp'].isoformat() | |
p['forwarded_for'] = ",".join(p['forwarded_for']) | |
p.update(kwargs) | |
return p | |
def insert_log(cur, log): | |
"""Insert a prepared log line into the database""" | |
items = log.items() | |
keys = [e[0] for e in items] | |
values = [e[1] for e in items] | |
sql = """insert into requests ({}) values ({})""".format( | |
", ".join(keys), | |
", ".join(["?"] * len(keys))) | |
cur.execute(sql, values) | |
def load_db(db, logs, **kwargs): | |
"""Load logs into database""" | |
cur = db.cursor() | |
for (_raw, _parsed, normalized) in logs: | |
prepared = prepare_log(normalized, **kwargs) | |
insert_log(cur, prepared) | |
db.commit() | |
def batches(l, length=500): | |
it = iter(l) | |
while True: | |
b = list(itertools.islice(it, length)) | |
if b: | |
yield b | |
else: | |
break | |
def main(): | |
_parser, args = parse_args() | |
db = setup_db(args.db) | |
def process(f): | |
for line in open(f): | |
log = line.strip() | |
parsed = parse_log(log) | |
if not parsed: | |
logging.debug("Invalid log: %s", log) | |
continue | |
normalized = normalize_log(parsed) | |
yield log, parsed, normalized | |
for batch in batches(process(args.logfile), args.batch): | |
load_db(db, batch, source=args.source) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment