Skip to content

Instantly share code, notes, and snippets.

@popmonkey
Last active September 21, 2019 00:34
Show Gist options
  • Save popmonkey/8a90dd9c5f4a4edbfbd8320b62639034 to your computer and use it in GitHub Desktop.
Save popmonkey/8a90dd9c5f4a4edbfbd8320b62639034 to your computer and use it in GitHub Desktop.
log parser that reads multiple sources and captures SQL statements to follow a row's lifecycle - can optionally filter for specific tables and specific ids
#! /usr/bin/env python3
"""
This script is a log file parser that looks for sql UPDATE statements and filters for specified tables and ids.
It can read from multiple sources and write to a separate file for each table and id combination.
examples:
`./looklog.py -t user,order *.log`: prints UPDATE statements for the ORDER and USER tables in *.log to stdout
`tail -F current.log | ./looklog.py -t user -i 12345`: real time tails current.log and filters for updates to
USER 12345
`./looklog.py -t user -s some.log`: writes a file in the form `user-<id>.log` for each USER updated in some.log
`./looklog.py -w *.log`: the output includes the filename of the log being parsed
`./looklog.py -t order -j user *.log`: updates to ORDER table joined to USER by USER_ID
`./looklog.py -I -t user`: also include INSERT/SELECT pairs that's a pattern that many ORMs use
-h for options
"""
__version__ = '0.2.7'
__author__ = 'jules@looker.com'
import sys
import os
import argparse
import re
import time
import datetime
import heapq
import glob
from operator import itemgetter
import itertools
import collections
MAX_INSERTS = 100
# TODO: refactor this since we're no longer really using this as a true dict
class FileManager(collections.OrderedDict):
"""Keep only the max_open most recent files open"""
def __init__(self, max_open=20):
self.seen = set()
self.max_open = max_open
super().__init__()
def __getitem__(self, fname):
fd = None
if fname in self:
fd = super().__getitem__(fname)
assert not fd.closed
if not fd:
fd = open(fname, 'a' if fname in self.seen else 'w')
self.__setitem__(fname, fd)
else:
self.move_to_end(fname)
return fd
def __setitem__(self, fname, fd):
self.seen.add(fname)
assert not fd.closed
super().__setitem__(fname, fd)
if len(self) >= self.max_open:
oldest = next(iter(self))
self[oldest].close()
del self[oldest]
arg_parse = argparse.ArgumentParser(description='Parse SQL statements in looker logs')
arg_parse.add_argument('input_paths', metavar='LOGFILE', nargs='*',
help='log file(s)')
arg_parse.add_argument('-t', '--table', metavar='TABLE[,TABLE ...]',
help='name of SQL table(s) [can be comma separated list]')
arg_parse.add_argument('-i', '--id', metavar='ID[,ID ...]',
help='ID(s) [can be comma separated list]')
arg_parse.add_argument('-s', '--split', action='store_true',
help='split output into at most this many files named <table>-<id>.log')
arg_parse.add_argument('-d', '--dir', default='.',
help='name of directory to split files to (need -s n)')
arg_parse.add_argument('-w', '--with_filename', action='store_true',
help='include source filename when writing matched lines')
arg_parse.add_argument('-j', '--join', metavar='TABLE[,TABLE ...]',
help='also search the id column TABLE_ID for join table updates [can be comma separated list]')
arg_parse.add_argument('-I', '--inserts', action='store_true',
help='track INSERT/SELECT pairs (Sequel and similar ORM logs only)')
arg_parse.add_argument('-o', '--other',
help='any python compatible regex')
args = arg_parse.parse_args()
table_filter = '|'.join(map(str.strip, (args.table or r'.*?').split(',')))
id_filter = '|'.join(map(str.strip, (args.id or r'\d+').split(',')))
join_filter = '|'.join(list(map(lambda s: '{}_id'.format(s.strip()), (args.join or '').split(','))))
update_statement_re = re.compile(
r'(?:UPDATE |DELETE FROM )["`](?P<table>{tables})["`].*WHERE.*["`](?:{id_cols})["`] = (?P<id>{ids})\D'
.format(tables=table_filter, ids=id_filter, id_cols='|'.join(['id'] + join_filter.split('|'))),
re.I)
# insert/select tracking is for Sequel generated rows only
insert_statement_re = re.compile(
r'INSERT INTO ["`](?P<table>{tables})["`]'
.format(tables=table_filter), re.I)
insert_statement_with_join_re = re.compile(
r'INSERT INTO ["`](?P<table>{tables})["`].*\(["`](?P<id_cols>{id_cols})["`].*VALUES \((?P<id>{ids})\D'
.format(tables=table_filter, ids=id_filter, id_cols=join_filter), re.I)
select_statement_re = re.compile(
r'SELECT \* FROM ["`](?P<table>{tables})["`] WHERE \(["`]{id_cols}["`] = (?P<id>{ids})\D'
.format(tables=table_filter, ids=id_filter, id_cols=join_filter), re.I)
date_re = re.compile(r'(?P<timestamp>\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d.\d+ -\d{4})')
if args.other:
other_statement_re = re.compile(args.other, re.I)
fm = FileManager()
recent_inserts = collections.OrderedDict()
other_fd = open('{}/other' % args.dir, 'w') if args.split else sys.stdout
if not args.input_paths:
# no paths specified so just read stdin
input_files = [sys.stdin]
else:
# combine all the glob'ed filenames
input_files = map(open, itertools.chain(*map(lambda p: glob.glob(os.path.expanduser(p)), args.input_paths)))
# Use Heapq's merge to lazily sort input files
# https://stackoverflow.com/questions/12460943/merging-pre-sorted-files-without-reading-everything-into-memory
def timestamp_from_line(_line):
ts_m = date_re.match(_line)
if ts_m is not None:
return time.mktime(datetime.datetime.strptime(ts_m.group('timestamp'),
'%Y-%m-%d %H:%M:%S.%f %z').timetuple())
return 0
# these next three lines perfectly illustrate why python is so damn awesome :)
decorated = [((timestamp_from_line(line), (line, fd)) for line in fd) for fd in input_files]
merged = heapq.merge(*decorated)
lines_and_fds = map(itemgetter(-1), merged)
def write_line(_in_fd, _m, _line, _other=False):
_out_fd = other_fd if _other else sys.stdout
if args.split and not _other:
# show progress if writing to split files
sys.stdout.write('.')
sys.stdout.flush()
fname = '{dir}/{table}-{id}.log'.format(dir=args.dir, table=_m.group('table'), id=_m.group('id'))
_out_fd = fm[fname]
# write the log line (with optional source filename)
_out_fd.write('{file}> {line}'.format(line=_line, file=_in_fd.name)
if args.with_filename else '{line}'.format(line=_line))
_out_fd.flush()
# process each line from the merged logs
for (line, in_fd) in lines_and_fds:
if args.inserts:
if args.join:
insert_with_join_m = insert_statement_with_join_re.search(line)
if insert_with_join_m:
write_line(in_fd, insert_with_join_m, line)
continue
insert_m = insert_statement_re.search(line)
if insert_m:
recent_inserts[insert_m.group('table')] = line
if len(recent_inserts) > MAX_INSERTS:
recent_inserts.popitem()
continue
select_m = select_statement_re.search(line)
if select_m:
table = select_m.group('table')
if table in recent_inserts:
write_line(in_fd, select_m, recent_inserts[table])
write_line(in_fd, select_m, line)
del recent_inserts[table]
continue
update_m = update_statement_re.search(line)
if update_m:
write_line(in_fd, update_m, line)
continue
if args.other:
other_m = other_statement_re.search(line)
if other_m:
write_line(in_fd, other_m, line, True)
continue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment