Skip to content

Instantly share code, notes, and snippets.

Created Dec 27, 2021
What would you like to do?
Process structured logs to JSON
#!/usr/bin/env python3
# parse log.h's structured format:
# - each line ends with sequence 0x20 0x1a 0x0a
# - must parse through possible ascii escape sequences
# - 0x1b (esc) until 'm'
# - structure starts with line containing 3 upcase alpha chars
# - message ends with 0x1a 0x1a 0x0a
# - each line after the first is 0x09
# possible improvement:
# parse time field to create session_seconds offset from start of log
import re
import sys
import json
RE_ALL_ANSICODES = re.compile(r'\x1b.+?m')
EOL = '\x20\x1a\x0a'
EOM = '\x1a\x1a\x0a'
class log_message:
def __init__(self):
def reset(self):
self.log = {}
self.field_key_buf = ""
self.field_value_buf = ""
def strip_line(line):
line = RE_ALL_ANSICODES.sub('', line)
return RE_ALL_ANSICODES.sub('', line)
def consume_n_chars(s, n):
return (s[:n], s[n:])
def consume_next_line(self, line):
"""Returns whether end-of-message"""
# expect header line first
if 'level' not in self.log:
return False
if self.field_value_buf == "":
eol = self.expect_field_begin(line)
return eol == EOM
eol = self.expect_field_continue(line)
return eol == EOM
def expect_eol(self, line):
if len(line) != 3:
self.parse_error("expected eol", line)
if line != EOL:
self.parse_error("invalid eol", line)
return line[:len(EOL)]
def expect_line_header(self, line):
line = log.strip_line(line)
if len(line) < 3:
self.parse_error("invalid message header", line)
log_level, line = log.consume_n_chars(line, 3)
if log_level not in ('TRC', 'MSG', 'WRN', 'ERR', 'FTL'):
self.parse_error("invalid log level", line)
line = self.expect_eol(line)
self.log['level'] = log_level
# should be nothing left
return line
def get_eol(s):
"""returns None, EOL or EOM from line"""
if len(s) < len(EOL):
return None
eol = s[len(s)-len(EOL):]
if eol == EOL or eol == EOM:
return eol
return None
def process_file_field(value):
Take files like '../../../src/foo.c' and return 'src/foo.c'
value = value.replace('\\', '/')
src = value.find("src/")
if src > 0:
value = value[src:]
return value
def expect_field_begin(self, line):
"""Returns the eol char, which can be:
- None (field did not end on line)
- EOL (further fields exist, but this one finished)
- EOM (last field in structured log)
line = log.strip_line(line)
tab, line = log.consume_n_chars(line, 1)
if tab != '\t':
self.parse_error("invalid message field", line)
key, value = line.split(': ')
self.field_key_buf = key
# begin special field processing
if key == 'file':
value = log.process_file_field(value)
# end special field processing
eol = log.get_eol(value)
if eol == None:
self.field_value_buf += value
return eol
value = value.split(eol)[0]
self.field_value_buf += value
self.log[self.field_key_buf] = value.rstrip()
self.field_key_buf = None
self.field_value_buf = ""
return eol
def expect_field_continue(self, line):
Same return as expect_field_begin
line = log.strip_line(line)
eol = log.get_eol(line)
if eol == None:
self.field_value_buf += line
return eol
line = line.split(eol)[0]
self.field_value_buf += line
self.log[self.field_key_buf] = self.field_value_buf
self.field_key_buf = None
self.field_value_buf = ""
return eol
def parse_error(self, msg, line):
print("parse error: %s on line:\n%s\n" % (msg, line), file=sys.stderr)
def to_json(self):
return json.dumps(self.log, indent=4)
if __name__ == '__main__':
log = log_message()
for line in iter(sys.stdin.readline, b''):
if line == "": break
if line[0] == '\x0a': continue
is_final_field = log.consume_next_line(line)
if is_final_field:
except KeyboardInterrupt:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment