Skip to content

Instantly share code, notes, and snippets.

@Frick
Last active November 8, 2019 05:00
Show Gist options
  • Save Frick/3cd2299fb23c4405cbbac502199b922b to your computer and use it in GitHub Desktop.
Save Frick/3cd2299fb23c4405cbbac502199b922b to your computer and use it in GitHub Desktop.
Well this got out of hand quickly...
#!/usr/bin/env python3
"""
Minimally parse logs in order to merge their contents in sorted order with minimal
resource usage - primarily memory, in order to support very large log files. For
example, whether 10 MBs or 10 GBs of logs, <15 MBs of memory is typically allocated.
Assumes each individual log file is already sorted.
The bash 'globstar' option is recommended in order to easily pass all log paths like:
./logsort.py mongodb-*/**/*.log
To enable:
shopt -s globstar
"""
import argparse
from datetime import datetime
import errno
import heapq
import os
from pathlib import Path
import re
import sys
# Timestamp regular expression
TIMESTAMP_RE = re.compile(r'(\d{4}[/-]\d{2}[/-]\d{2}[ T_]*\d{2}:\d{2}:\d{2}([.,]\d{3,6})?)')
SIMPLETS_RE = re.compile(r'[/ T:-]')
# Set from args, adjusted accordingly from initial log processing
START_TIME_RANGE = None
END_TIME_RANGE = None
class Log:
"""A class to maintain state about individual log files"""
_prefix = ''
_fd = None
def __init__(self, fd, prefix):
self._fd = fd
if prefix == 'full':
self._prefix = fd.name + ': '
elif prefix == 'base':
self._prefix = os.path.basename(fd.name) + ': '
def __iter__(self):
"""A generator to return log lines prefixed with a standardized,
sortable timestamp for parsing by heapq.merge() as well as optionally
prefixed with the log filename
"""
output = ''
skip = False
for line in self._fd:
match = TIMESTAMP_RE.search(line)
if match:
timestamp = SIMPLETS_RE.sub('', match.group(1))
if output != '' and not skip:
yield output
skip = False
if timestamp < START_TIME_RANGE:
skip = True
output = ''
elif timestamp > END_TIME_RANGE:
return
else:
output = timestamp + '|' + self._prefix + line
else:
output += line
if output != '' and not skip:
yield output
def process_logs(logs, prefix):
"""Basic function for initiating the Log objects"""
processed_logs = []
for log in logs:
processed_logs.append(Log(log, prefix))
return processed_logs
def parse_timestamp(line):
"""Simple key-function for use by heapq.merge()"""
return line.split('|', 0)[0]
def sort_logs(logs, output):
"""Sort the given list of Log objects, writing output to given file object"""
for line in heapq.merge(*logs, key=parse_timestamp):
try:
print(line.split('|', 1)[1], file=output, end='')
except IndexError:
# no timestamp found prefixed
pass
except IOError as err:
# gracefully handle pipes closing
if err.errno == errno.EPIPE:
pass
def find_logs(directory):
"""Recursively search the given directory for log files"""
logs = []
for log in Path(directory).glob('**/*.log'):
logs.append(open(log, 'r'))
return logs
def parse_args(args):
"""Handle parsing arguments"""
epoch = '19691231190000.000'
now = datetime.now().strftime('%Y%m%d%H%M%S.%f')[:-3]
parser = argparse.ArgumentParser(description='Merge multiple log files, '
'sorted by timestamp. Optionally prefix and/or '
'filter by time range.')
parser.add_argument('-o', '--output', type=argparse.FileType('w'),
default=sys.stdout,
help='where the merged file should be output '
'(default: stdout)')
parser.add_argument('--no-prefix', action='store_true',
help='do not prefix log lines with filename')
parser.add_argument('--full-prefix', action='store_true',
help='include full path of filename as log line prefix')
parser.add_argument('-s', '--start-time', default=epoch,
help='filter logs by timestamp, starting at given time')
parser.add_argument('-e', '--end-time', default=now,
help='filter logs by timestamp, ending at given time')
files = parser.add_argument_group(title='specifying log files',
description='use at least one, or potentially '
'both, of the below methods to specify log '
'files to merge and sort')
files.add_argument('-d', '--directory', type=str, default=None, nargs='*',
help='one or more (space-delimited) directories to '
'recursively search for all .log files')
files.add_argument('logs', nargs='*', type=argparse.FileType('r'),
help='individual files, useful for shell globbing - '
'specify these before any -d/--directory')
parsed_args = parser.parse_args(args)
if parsed_args.directory is None and not parsed_args.logs:
parser.print_usage(file=sys.stderr)
print('error: at least one of logs or -d/--directory is required.',
file=sys.stderr)
sys.exit(1)
parsed_args.prefix = 'base'
if parsed_args.no_prefix:
parsed_args.prefix = ''
elif parsed_args.full_prefix:
parsed_args.prefix = 'full'
# standardize start and end times
if parsed_args.start_time != epoch:
match = TIMESTAMP_RE.search(parsed_args.start_time)
if match:
parsed_args.start_time = SIMPLETS_RE.sub('', match.group(1))
else:
parser.print_usage(file=sys.stderr)
print('error: start time "{parsed_args.start_time}" not in recognized'
'format such as ISO8601', file=sys.stderr)
sys.exit(1)
if parsed_args.end_time != now:
match = TIMESTAMP_RE.search(parsed_args.end_time)
if match:
parsed_args.end_time = SIMPLETS_RE.sub('', match.group(1))
else:
parser.print_usage(file=sys.stderr)
print('error: end time "{parsed_args.end_time}" not in recognized'
'format such as ISO8601', file=sys.stderr)
sys.exit(1)
# search any given directories for logs
if parsed_args.directory:
for directory in parsed_args.directory:
if not os.path.isdir(directory):
parser.print_usage(file=sys.stderr)
print(f'error: searching -d/--directory: "{directory}" '
'is not a directory', file=sys.stderr)
sys.exit(1)
try:
parsed_args.logs += find_logs(directory)
except PermissionError as err:
parser.print_usage(file=sys.stderr)
print(f'error: searching -d/--directory: {err}', file=sys.stderr)
sys.exit(1)
return parsed_args
def main():
"""Main program flow"""
global START_TIME_RANGE, END_TIME_RANGE
args = parse_args(sys.argv[1:])
START_TIME_RANGE = args.start_time
END_TIME_RANGE = args.end_time
logs = process_logs(args.logs, args.prefix)
sort_logs(logs, args.output)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment