Skip to content

Instantly share code, notes, and snippets.

@chipaca
Last active Apr 30, 2019
Embed
What would you like to do?
Turn snapd HTTP logs into something more human-friendly
#!/usr/bin/python3
# TODO: multiline continuations
import argparse
import json
import os
import re
import signal
import sys
from email.parser import Parser
from http.client import HTTPMessage
from pygments import highlight, formatters
from pygments.lexers.data import JsonLexer, YamlLexer
from pygments.lexers.configs import PropertiesLexer
# avoid the "aaahh broken pipe" error when quitting less
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
# avoid the "aaahh keyboardinterrupt" error when hitting ^C
signal.signal(signal.SIGINT, signal.SIG_DFL)
parser = argparse.ArgumentParser()
parser.add_argument(
"logfiles",
metavar="FILE",
nargs="+",
type=argparse.FileType("r"),
help="journalctl or snapd output; use - for stdin",
)
parser.add_argument(
"-d",
dest="direct",
action="store_true",
help="the logs came from snapd directly, not via the journal",
)
parser.add_argument("-c", dest="color", action="store_true", help="use colour")
args = parser.parse_args()
if args.direct:
# direct output from go
# 2019/01/18 09:13:24.207661
prefix = r"\d\d\d\d/\d\d/\d\d \d\d:\d\d:\d\d\.\d+ "
else:
# journalctl
# Feb 06 20:45:43 fleet snapd[1514]:
prefix = r"\w\w\w \d\d \d\d:\d\d:\d\d \S+ snapd\[\d+\]: "
rxAuth = re.compile(r'(\w+)="([^"]+)"')
def shorten(s):
if len(s) > 20:
return s[:6] + "..." + s[-6:]
return s
# logger.go:67: DEBUG: > "..."
rxBegin = re.compile(prefix + r'\S+: DEBUG: ([<>]) ("[A-Z].*)')
# continuation:
rxContd = re.compile(prefix + r"(.*)")
# the actual log:
rx = re.compile(r'"([A-Z]+.*?)(?:\\[rn])+(.*?\\r\\n\\r\\n)(.*)"')
if not args.color:
if "NO_COLOR" not in os.environ and sys.stdout.isatty():
args.color = True
if args.color:
sep = "\033[2m" + "-" * 40 + "\033[0m"
if "256" in os.getenv("TERM"):
formatter = formatters.TerminalTrueColorFormatter()
else:
formatter = formatters.TerminalFormatter()
else:
sep = "-" * 40
formatter = formatters.NullFormatter()
json_lexer = JsonLexer()
yaml_lexer = YamlLexer()
prop_lexer = PropertiesLexer()
for logfile in args.logfiles:
contd = False
accum = ""
inout = ""
for line in logfile:
if not contd:
m = rxBegin.match(line)
if m is None:
print(line, end="")
continue
inout = m.group(1)
accum = m.group(2)
else:
m = rxContd.match(line)
if m is None:
raise RuntimeError(line)
accum += m.group(1)
if not accum.endswith('"'):
contd = True
continue
# so it ends in ". Does it end in \"?
if accum.endswith(r"\"") and not accum.endswith(r'\\"'):
# yeah this is brittle but good enough
contd = True
continue
contd = False
# ok accum has a whole "...."
m = rx.match(accum)
if m is None:
raise RuntimeError(accum)
print(sep)
print(inout, m.group(1))
headers = Parser(_class=HTTPMessage).parsestr(eval('"' + m.group(2) + '"'))
hs=[]
for k, v in headers.items():
if "authorization" in k.lower():
v = rxAuth.sub(
lambda m: m.group(1) + '="' + shorten(m.group(2)) + '"', v
)
headers[k] = v
hs.append(k + ": " + v)
highlight("\n".join(hs), prop_lexer, formatter, sys.stdout)
j = m.group(3)
if len(j) == 0:
continue
print()
s = eval('"' + j + '"')
content_type = headers.get_content_type()
if content_type in ("application/json", "application/hal+json"):
rsp = json.loads(s)
if type(rsp) == dict and len(rsp) == 1 and "discharge_macaroon" in rsp:
rsp["discharge_macaroon"] = shorten(rsp["discharge_macaroon"])
highlight(
json.dumps(rsp, indent=4, sort_keys=True),
json_lexer,
formatter,
sys.stdout,
)
elif content_type == "application/x.ubuntu.assertion":
idx = s.find("\n\n") + 1
if idx > 0:
highlight(s[:idx], yaml_lexer, formatter, sys.stdout)
s = s[idx:]
print(s.rstrip())
else:
print(s)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment