Skip to content

Instantly share code, notes, and snippets.

@roguh
Last active November 30, 2021 18:13
Show Gist options
  • Save roguh/e914fd540061ff74b6a703c13113822c to your computer and use it in GitHub Desktop.
Save roguh/e914fd540061ff74b6a703c13113822c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import fileinput
import json
import re
import sys
from argparse import ArgumentParser
from dataclasses import dataclass
from pprint import pprint
from typing import Any, Dict, List, Optional, Union
DEFAULT_JSON_INDENT = 4
FORMAT = "{asctime} {severity} {name} {module}:{funcName}:{lineno} {message}"
REQUIRED_KEYS = "{asctime} {severity} {name} {module}:{funcName}:{lineno} {message}"
# Stack traces or exception tracebacks
KEYS_WITH_STACK_TRACES = [
"exc_info",
"stack_info",
]
KNOWN_KEYS = [
"name",
"asctime",
"severity",
"module",
"funcName",
"lineno",
"message",
] + KEYS_WITH_STACK_TRACES
DEFAULT_IGNORED_KEYS = [
"filename",
"pathname",
]
StrOrBytes = Union[str, bytes]
ParsedLog = Dict[str, Any]
@dataclass
class Arguments:
files: List[str]
fail_if_non_json: bool
missing_keys_warnings: bool
ignored_keys: List[str]
show_stack_traces: bool
show_extra_keys: bool
show_message: bool
show_non_json: bool
strip_stack_traces: bool
replace_stack_trace_newlines: Optional[str]
extra_keys_sep: str
json_indent: Optional[int]
def dataclass_to_dict(dataclass_) -> Dict[str, Any]:
return {
key: getattr(dataclass_, key) for key in dataclass_.__dataclass_fields__.keys()
}
def parse_plain_json(line):
try:
return json.loads(line)
except json.decoder.JSONDecodeError:
return None
def parse_loki_json(line):
try:
return parse_plain_json(line.split("\t", 1)[1])
except IndexError:
return None
JSON_PARSERS = [parse_plain_json, parse_loki_json]
def parse(
line: StrOrBytes, fail_if_non_json: bool = False
) -> Union[ParsedLog, StrOrBytes]:
for parser in JSON_PARSERS:
log_msg = parser(line)
if log_msg is not None:
return log_msg
if fail_if_non_json:
print(
"FATAL: could not parse JSON with parsers",
", ".join(func.__name__ for func in JSON_PARSERS),
)
print(line[:-1])
sys.exit(1)
return line
def format_extra_keys(log_msg: ParsedLog, args: Arguments) -> str:
extra_items = {
key: value
for key, value in log_msg.items()
if not (key in KNOWN_KEYS or key in args.ignored_keys)
}
output = ""
if args.show_extra_keys and len(extra_items) > 0:
# Add newlines between msg components
if args.show_message:
output += args.extra_keys_sep
output += json.dumps(extra_items, indent=args.json_indent)
return output
def strip_stack_trace(stack_trace: str) -> str:
lines = stack_trace.split("\n")
last_file_location_index = len(lines) - 1
for index, line in enumerate(lines):
if re.match('\\s*File ".*", line .*', line):
last_file_location_index = index
return "\n".join(lines[:2] + lines[last_file_location_index:])
def format_stack_traces(
log_msg: ParsedLog,
newline_at_start: bool,
strip: bool,
replace_newlines: Optional[str] = None,
) -> str:
output = ""
for i, field in enumerate(KEYS_WITH_STACK_TRACES):
if field in log_msg:
if log_msg[field] == "NoneType: None":
continue
# Add newlines between msg components
if i == 0 and not newline_at_start:
exc_sep = ""
else:
exc_sep = replace_newlines if replace_newlines is not None else "\n"
stack_trace = log_msg[field]
if strip:
stack_trace = strip_stack_trace(stack_trace)
if replace_newlines:
stack_trace = stack_trace.replace("\n", replace_newlines)
output += exc_sep + stack_trace
return output
def format_log_msg(
log_msg: Union[ParsedLog, StrOrBytes], original_line: StrOrBytes, args: Arguments
) -> Optional[StrOrBytes]:
try:
if not isinstance(log_msg, dict):
raise ValueError(f"expected dict, received {type(log_msg)}")
msg = ""
if args.show_message:
msg += FORMAT.format(**{"name": "|"} | log_msg)
msg += format_extra_keys(log_msg, args)
if args.show_stack_traces:
msg += format_stack_traces(
log_msg,
newline_at_start=args.show_extra_keys or args.show_message,
strip=args.strip_stack_traces,
replace_newlines=args.replace_stack_trace_newlines,
)
if len(msg) == 0:
return None
return msg
except (ValueError, KeyError) as exc:
if isinstance(exc, KeyError) and args.missing_keys_warnings:
print("WARNING: missing keys", file=sys.stderr)
if args.show_non_json:
# Exclude trailing newline
return original_line[:-1]
def parse_args() -> Arguments:
parser = ArgumentParser(
description="""Convert JSON logs from stdin or files into readable output.
The JSON keys are assumed to come from Python.
Stack trace and exception tracebacks will be searched for in the keys: """
+ (", ".join(KEYS_WITH_STACK_TRACES))
)
parser.add_argument(
"files", help="Which log files to read. Reads from stdin as well.", nargs="*"
)
parser.add_argument(
"--missing-keys-warnings",
help="Print a warning to stderr if there are missing required keys in JSON logs.",
action="store_true",
)
parser.add_argument(
"--no-extra-keys",
help="Do not print extra keys in a JSON log line",
action="store_true",
)
parser.add_argument(
"--no-message",
help="Do not print the main message. Only the stack traces or extra keys",
action="store_true",
)
non_json_group = parser.add_mutually_exclusive_group()
non_json_group.add_argument(
"--no-non-json",
help="Hide non-JSON input instead of printing it as is",
action="store_true",
)
non_json_group.add_argument(
"--only-non-json",
help="Only show non-JSON input",
action="store_true",
)
non_json_group.add_argument(
"--fail-if-non-json",
help="Exit immediately if some lines cannot be parsed as JSON.",
action="store_true",
)
stack_trace_group = parser.add_mutually_exclusive_group()
stack_trace_group.add_argument(
"--no-stack-traces",
help="Do not print Python stack traces or exception tracebacks.",
action="store_true",
)
stack_trace_group.add_argument(
"--only-stack-traces",
help="Only print JSON log lines that contain Python stack traces or exception tracebacks.",
action="store_true",
)
parser.add_argument(
"--strip-stack-traces",
help="Print only the first two lines and last few lines, including the last file position, of Python stack traces or exception tracebacks.",
action="store_true",
)
parser.add_argument(
"--replace-stack-trace-newlines",
help="Print Python stack traces or exception tracebacks, but replace newlines with the argument.",
action="store",
default=None,
)
parser.add_argument(
"--extra-keys-on-same-line",
help="Print extra keys on the same line as the formatted log message",
action="store_true",
)
parser.add_argument(
"--stack-traces-one-line",
help="Print the stack traces and exception tracebacks on a single line. Same as \"--replace-stack-trace-newlines ' '\"",
action="store_true",
)
parser.add_argument(
"--one-line",
help="Print the formatted log message on a single line. Excludes stack traces and exception tracebacks.",
action="store_true",
)
parser.add_argument(
"--ignore",
help="Specify a key which must be ignored. Can be given multiple times",
action="append",
default=None,
)
# TODO
# parser.add_argument("--message-format")
# parser.add_argument("--known-keys")
json_formatting = parser.add_mutually_exclusive_group()
json_formatting.add_argument(
"--json-indent",
help="How much to indent extra keys when pretty-printing them as JSON.",
default=DEFAULT_JSON_INDENT,
)
json_formatting.add_argument(
"--compact-json",
help="Print extra keys in a compact format",
action="store_true",
)
parser.add_argument("--print-arguments", action="store_true")
args = parser.parse_args()
# convenience for --no-extra --no-message
if args.only_stack_traces:
args.no_extra_keys = True
args.no_message = True
if args.stack_traces_one_line:
args.replace_stack_trace_newlines = " "
application_args = Arguments(
files=args.files,
missing_keys_warnings=args.missing_keys_warnings,
show_stack_traces=not args.no_stack_traces and not args.only_non_json,
show_extra_keys=not args.no_extra_keys and not args.only_non_json,
show_message=not args.no_message and not args.only_non_json,
show_non_json=args.only_non_json or not args.no_non_json,
fail_if_non_json=args.fail_if_non_json,
strip_stack_traces=args.strip_stack_traces,
replace_stack_trace_newlines=args.replace_stack_trace_newlines,
extra_keys_sep=" " if args.one_line or args.extra_keys_on_same_line else "\n",
json_indent=None if args.one_line or args.compact_json else args.json_indent,
ignored_keys=DEFAULT_IGNORED_KEYS if args.ignore is None else args.ignore,
)
if args.print_arguments:
print("Arguments:", file=sys.stderr)
pprint(dataclass_to_dict(application_args), stream=sys.stderr)
return application_args
def main():
args = parse_args()
with fileinput.input(files=args.files, mode="r") as fileinputinput:
try:
for line in fileinputinput:
parsed_json = parse(line, args.fail_if_non_json)
readable_message = format_log_msg(parsed_json, line, args)
if readable_message is not None:
print(readable_message)
except KeyboardInterrupt:
return
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment