Skip to content

Instantly share code, notes, and snippets.

@pgerber
Created March 15, 2018 12:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pgerber/65d32821db5af33915d95547beb56417 to your computer and use it in GitHub Desktop.
Save pgerber/65d32821db5af33915d95547beb56417 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
import argparse
from datetime import datetime, timezone
from functools import reduce
import json
import operator
import re
import sys
NON_MDC = {'message', 'level', 'logger_name', 'message', 'thread_name', 'version', 'revision', 'tags', 'level_name', '@timestamp', 'stack_trace'}
LEVELS = {
'trace': 0,
'debug': 1,
'info': 2,
'warn': 3,
'error': 4
}
def local_time(ts):
if ts is None:
return 'UNKNOWN TIME'
ts = datetime.strptime(ts, '%Y-%m-%dT%H:%M:%S.%f+00:00')
ts = ts.replace(tzinfo=timezone.utc)
ts = ts.astimezone()
return ts.strftime('%Y-%m-%d %H:%M:%S')
def mdc(msg, output):
mdc = [(k, v) for k, v in msg.items() if k not in NON_MDC]
mdc.sort()
if mdc:
print()
for key, value in mdc:
print('{} = {}'.format(key, value), file=output)
def is_level_enabled(level, level_enabled):
if level is None:
return True
if LEVELS[level_enabled] <= LEVELS[level.lower()]:
return True
return False
def unscramble(file, output, *, level='INFO', predicate=lambda: True):
for line in file:
try:
msg = json.loads(line)
if not is_level_enabled(msg.get('level'), level):
continue
for value in msg.values():
if predicate(str(value)):
break
else:
continue
print(
'{ts} {level:5} - thread: {thread}, logger: {logger}'
.format(
ts = local_time(msg.get('@timestamp')),
level = msg.get('level_name', 'UNKNOWN'),
thread = msg.get('thread_name', 'UNKNOWN'),
logger = msg.get('logger_name', 'UNKNOWN'),
), file=output)
print()
print(msg.get('message', 'NO MESSAGE'), file=output)
mdc(msg, output)
trace = msg.get('stack_trace')
if trace is not None:
print()
print(trace)
print('=' * 100)
except json.decoder.JSONDecodeError:
if predicate(line):
print('PLAIN TEXT: ', line, end='', file=output)
def generate_predicate(search_term, flags):
if search_term is not None:
regex = re.compile(search_term, reduce(operator.or_, flags))
return lambda r: regex.search(r)
else:
return lambda r: True
def main():
arg_parser = argparse.ArgumentParser(description='unscramble Nice logfiles obtained using `oc logs …`')
term = arg_parser.add_argument('search_term', metavar='SEARCH_TERM', nargs='?',
help='Search pattern (regex) that need to appears in the log message stack '
'trace or an MDC field (session_id, user_name, …).')
arg_parser.add_argument('--ignore-case', '-i', dest='regex_flags', default=[re.MULTILINE],
action='append_const', const=re.IGNORECASE, help='Ignore case in SEARCH_TERM.')
arg_parser.add_argument('--file', '-f', type=open, default=sys.stdin, nargs='?',
help='File from which to read logs. If omitted, logs are read from stdin')
arg_parser.add_argument('--level', '-l', default='trace', choices=['trace', 'info', 'warn', 'error'],
help='Max. log level shown')
args = arg_parser.parse_args()
try:
predicate = generate_predicate(args.search_term, args.regex_flags)
except re.error as e:
message = 'failed to compile regular expression {!r}: {}'.format(args.search_term, e)
arg_parser.error(argparse.ArgumentError(term, message))
unscramble(args.file, sys.stdout, level=args.level, predicate=predicate)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment