Created
March 15, 2018 12:37
-
-
Save pgerber/65d32821db5af33915d95547beb56417 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import argparse | |
from datetime import datetime, timezone | |
from functools import reduce | |
import json | |
import operator | |
import re | |
import sys | |
NON_MDC = {'message', 'level', 'logger_name', 'message', 'thread_name', 'version', 'revision', 'tags', 'level_name', '@timestamp', 'stack_trace'} | |
LEVELS = { | |
'trace': 0, | |
'debug': 1, | |
'info': 2, | |
'warn': 3, | |
'error': 4 | |
} | |
def local_time(ts): | |
if ts is None: | |
return 'UNKNOWN TIME' | |
ts = datetime.strptime(ts, '%Y-%m-%dT%H:%M:%S.%f+00:00') | |
ts = ts.replace(tzinfo=timezone.utc) | |
ts = ts.astimezone() | |
return ts.strftime('%Y-%m-%d %H:%M:%S') | |
def mdc(msg, output): | |
mdc = [(k, v) for k, v in msg.items() if k not in NON_MDC] | |
mdc.sort() | |
if mdc: | |
print() | |
for key, value in mdc: | |
print('{} = {}'.format(key, value), file=output) | |
def is_level_enabled(level, level_enabled): | |
if level is None: | |
return True | |
if LEVELS[level_enabled] <= LEVELS[level.lower()]: | |
return True | |
return False | |
def unscramble(file, output, *, level='INFO', predicate=lambda: True): | |
for line in file: | |
try: | |
msg = json.loads(line) | |
if not is_level_enabled(msg.get('level'), level): | |
continue | |
for value in msg.values(): | |
if predicate(str(value)): | |
break | |
else: | |
continue | |
print( | |
'{ts} {level:5} - thread: {thread}, logger: {logger}' | |
.format( | |
ts = local_time(msg.get('@timestamp')), | |
level = msg.get('level_name', 'UNKNOWN'), | |
thread = msg.get('thread_name', 'UNKNOWN'), | |
logger = msg.get('logger_name', 'UNKNOWN'), | |
), file=output) | |
print() | |
print(msg.get('message', 'NO MESSAGE'), file=output) | |
mdc(msg, output) | |
trace = msg.get('stack_trace') | |
if trace is not None: | |
print() | |
print(trace) | |
print('=' * 100) | |
except json.decoder.JSONDecodeError: | |
if predicate(line): | |
print('PLAIN TEXT: ', line, end='', file=output) | |
def generate_predicate(search_term, flags): | |
if search_term is not None: | |
regex = re.compile(search_term, reduce(operator.or_, flags)) | |
return lambda r: regex.search(r) | |
else: | |
return lambda r: True | |
def main(): | |
arg_parser = argparse.ArgumentParser(description='unscramble Nice logfiles obtained using `oc logs …`') | |
term = arg_parser.add_argument('search_term', metavar='SEARCH_TERM', nargs='?', | |
help='Search pattern (regex) that need to appears in the log message stack ' | |
'trace or an MDC field (session_id, user_name, …).') | |
arg_parser.add_argument('--ignore-case', '-i', dest='regex_flags', default=[re.MULTILINE], | |
action='append_const', const=re.IGNORECASE, help='Ignore case in SEARCH_TERM.') | |
arg_parser.add_argument('--file', '-f', type=open, default=sys.stdin, nargs='?', | |
help='File from which to read logs. If omitted, logs are read from stdin') | |
arg_parser.add_argument('--level', '-l', default='trace', choices=['trace', 'info', 'warn', 'error'], | |
help='Max. log level shown') | |
args = arg_parser.parse_args() | |
try: | |
predicate = generate_predicate(args.search_term, args.regex_flags) | |
except re.error as e: | |
message = 'failed to compile regular expression {!r}: {}'.format(args.search_term, e) | |
arg_parser.error(argparse.ArgumentError(term, message)) | |
unscramble(args.file, sys.stdout, level=args.level, predicate=predicate) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment