Last active
March 3, 2021 10:44
-
-
Save Tantalus13A98B5F/53efe59c8543e968b47c2afed6ffc756 to your computer and use it in GitHub Desktop.
A utility for parsing structured logs (NDPDA)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
'''Parse logs in structures with the help of a pushdown automata. | |
Grammar for the sample log file: | |
>This line is used for exact match. | |
$This line is used for regexp match. | |
This line is ignored. | |
The following lines marked a repeated region. | |
!repeat | |
$Match the field you like: (\d+) | |
!endrepeat | |
''' | |
from contextlib import suppress | |
import weakref | |
import types | |
import re | |
__all__ = ['LogMata'] | |
def numerize(val): | |
for func in [int, float]: | |
with suppress(ValueError): | |
return func(val) | |
return val | |
class NumerizedMatch: | |
def __init__(self, m): | |
self.m = m | |
def __str__(self): | |
return repr(self.m) | |
def __repr__(self): | |
return '<Numerized: %s>' % str(self) | |
def group(self, *args, **kwargs): | |
ret = self.m.group(*args, **kwargs) | |
if isinstance(ret, tuple): | |
return tuple(numerize(i) for i in ret) | |
return numerize(ret) | |
def groups(self, *args, **kwargs): | |
ret = self.m.groups(*args, **kwargs) | |
return tuple(numerize(i) for i in ret) | |
class Node: | |
def __init__(self): | |
self.nextmove = [] | |
def dfs_match(self, line): | |
for edge in self.nextmove: | |
if edge.is_free: | |
target = edge.target() | |
acts = [edge.action] if edge.action else [] | |
for edge2, acts2 in target.dfs_match(line): | |
yield edge2, acts + acts2 | |
elif edge.match(line): | |
yield edge, [] | |
@property | |
def is_terminal(self): | |
return not self.nextmove | |
def dfs_terminal(self): | |
if self.is_terminal: | |
return True | |
return any(edge.target().dfs_terminal() \ | |
for edge in self.nextmove if edge.is_free) | |
class Edge: | |
def __init__(self, target, pattern=None, action=None): | |
self.target = weakref.ref(target) | |
self.pattern = pattern | |
self.regexp = re.compile(pattern) if pattern else None | |
self.action = action | |
def match(self, line): | |
if self.regexp: | |
return self.regexp.match(line) | |
@property | |
def is_free(self): | |
return self.pattern is None | |
class NestedStack: | |
def __init__(self): | |
self.coll = [[]] | |
@property | |
def top(self): | |
return self.coll[-1] | |
@property | |
def bottom(self): | |
return self.coll[0] | |
@property | |
def levels(self): | |
return len(self.coll) | |
def addlevel(self): | |
x = [] | |
self.top.append(x) | |
self.coll.append(x) | |
def droplevel(self): | |
self.coll.pop() | |
class LogMata: | |
def add_node(self): | |
ret = Node() | |
self.nodes.append(ret) | |
return ret | |
def add_edge(self, src, target=None, **kwargs): | |
target = target or self.add_node() | |
ret = Edge(target, **kwargs) | |
src.nextmove.append(ret) | |
return target | |
@classmethod | |
def splitlines(self, lines): | |
if isinstance(lines, str): | |
lines = lines.splitlines() | |
return (ln.rstrip() for ln in lines) | |
def __init__(self): | |
self.nodes = [] | |
self.root = self.add_node() | |
def parse_sample(self, lines): | |
iterlines = self.splitlines(lines) | |
def innerparse(cur, exitcmd): | |
while True: | |
line = next(iterlines, None) | |
if line is None: | |
assert not exitcmd | |
break | |
elif line.startswith('>'): | |
cur = self.add_edge(cur, pattern=re.escape(line[1:])) | |
elif line.startswith('$'): | |
cur = self.add_edge(cur, pattern=line[1:]) | |
elif line.startswith('!'): | |
if line == '!repeat': | |
prev = self.add_edge(cur, action='loopentry') | |
cur = innerparse(prev, '!endrepeat') | |
self.add_edge(cur, prev, action='looping') | |
cur = self.add_edge(cur, action='loopexit') | |
else: | |
assert line == exitcmd | |
break | |
else: | |
assert line.startswith(' ') or not line | |
return cur | |
innerparse(self.root, None) | |
def parse_log(self, lines): | |
iterlines = self.splitlines(lines) | |
curnode = self.root | |
parsestack = NestedStack() | |
for line in iterlines: | |
matchlist = list(curnode.dfs_match(line)) | |
if not matchlist: | |
continue | |
((edge, acts),) = matchlist | |
for action in acts: | |
if action == 'loopentry': | |
parsestack.addlevel() | |
parsestack.addlevel() | |
elif action == 'looping': | |
parsestack.droplevel() | |
parsestack.addlevel() | |
elif action == 'loopexit': | |
parsestack.droplevel() | |
parsestack.droplevel() | |
else: | |
raise Exception() | |
parsestack.top.append(NumerizedMatch(edge.match(line))) | |
curnode = edge.target() | |
if curnode.is_terminal: | |
break | |
assert curnode.dfs_terminal() | |
return parsestack.bottom |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment