Created
December 27, 2021 20:58
-
-
Save mlabbe/92fcd3f33722c4ade9cf1ed536a8cbba to your computer and use it in GitHub Desktop.
Process structured logs to JSON
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# parse log.h's structured format: | |
# | |
# - each line ends with sequence 0x20 0x1a 0x0a | |
# - must parse through possible ascii escape sequences | |
# - 0x1b (esc) until 'm' | |
# - structure starts with line containing 3 upcase alpha chars | |
# - message ends with 0x1a 0x1a 0x0a | |
# - each line after the first is 0x09 | |
# possible improvement: | |
# parse time field to create session_seconds offset from start of log | |
import re | |
import sys | |
import json | |
RE_ALL_ANSICODES = re.compile(r'\x1b.+?m') | |
EOL = '\x20\x1a\x0a' | |
EOM = '\x1a\x1a\x0a' | |
class log_message: | |
def __init__(self): | |
self.reset() | |
def reset(self): | |
self.log = {} | |
self.field_key_buf = "" | |
self.field_value_buf = "" | |
@staticmethod | |
def strip_line(line): | |
line = RE_ALL_ANSICODES.sub('', line) | |
return RE_ALL_ANSICODES.sub('', line) | |
@staticmethod | |
def consume_n_chars(s, n): | |
return (s[:n], s[n:]) | |
def consume_next_line(self, line): | |
"""Returns whether end-of-message""" | |
# expect header line first | |
if 'level' not in self.log: | |
self.expect_line_header(line) | |
return False | |
if self.field_value_buf == "": | |
eol = self.expect_field_begin(line) | |
return eol == EOM | |
eol = self.expect_field_continue(line) | |
return eol == EOM | |
def expect_eol(self, line): | |
if len(line) != 3: | |
self.parse_error("expected eol", line) | |
if line != EOL: | |
self.parse_error("invalid eol", line) | |
return line[:len(EOL)] | |
def expect_line_header(self, line): | |
line = log.strip_line(line) | |
if len(line) < 3: | |
self.parse_error("invalid message header", line) | |
log_level, line = log.consume_n_chars(line, 3) | |
if log_level not in ('TRC', 'MSG', 'WRN', 'ERR', 'FTL'): | |
self.parse_error("invalid log level", line) | |
line = self.expect_eol(line) | |
self.log['level'] = log_level | |
# should be nothing left | |
return line | |
@staticmethod | |
def get_eol(s): | |
"""returns None, EOL or EOM from line""" | |
if len(s) < len(EOL): | |
return None | |
eol = s[len(s)-len(EOL):] | |
if eol == EOL or eol == EOM: | |
return eol | |
return None | |
@staticmethod | |
def process_file_field(value): | |
""" | |
Take files like '../../../src/foo.c' and return 'src/foo.c' | |
""" | |
value = value.replace('\\', '/') | |
src = value.find("src/") | |
if src > 0: | |
value = value[src:] | |
return value | |
def expect_field_begin(self, line): | |
"""Returns the eol char, which can be: | |
- None (field did not end on line) | |
- EOL (further fields exist, but this one finished) | |
- EOM (last field in structured log) | |
""" | |
line = log.strip_line(line) | |
tab, line = log.consume_n_chars(line, 1) | |
if tab != '\t': | |
self.parse_error("invalid message field", line) | |
key, value = line.split(': ') | |
self.field_key_buf = key | |
# begin special field processing | |
if key == 'file': | |
value = log.process_file_field(value) | |
# end special field processing | |
eol = log.get_eol(value) | |
if eol == None: | |
self.field_value_buf += value | |
return eol | |
value = value.split(eol)[0] | |
self.field_value_buf += value | |
self.log[self.field_key_buf] = value.rstrip() | |
self.field_key_buf = None | |
self.field_value_buf = "" | |
return eol | |
def expect_field_continue(self, line): | |
""" | |
Same return as expect_field_begin | |
""" | |
line = log.strip_line(line) | |
eol = log.get_eol(line) | |
if eol == None: | |
self.field_value_buf += line | |
return eol | |
line = line.split(eol)[0] | |
self.field_value_buf += line | |
self.log[self.field_key_buf] = self.field_value_buf | |
self.field_key_buf = None | |
self.field_value_buf = "" | |
return eol | |
def parse_error(self, msg, line): | |
print("parse error: %s on line:\n%s\n" % (msg, line), file=sys.stderr) | |
sys.exit(1) | |
def to_json(self): | |
return json.dumps(self.log, indent=4) | |
if __name__ == '__main__': | |
log = log_message() | |
try: | |
for line in iter(sys.stdin.readline, b''): | |
if line == "": break | |
if line[0] == '\x0a': continue | |
is_final_field = log.consume_next_line(line) | |
if is_final_field: | |
print(log.to_json()) | |
log.reset() | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment