Skip to content

Instantly share code, notes, and snippets.

@Tantalus13A98B5F
Last active March 3, 2021 10:44
Show Gist options
  • Save Tantalus13A98B5F/53efe59c8543e968b47c2afed6ffc756 to your computer and use it in GitHub Desktop.
Save Tantalus13A98B5F/53efe59c8543e968b47c2afed6ffc756 to your computer and use it in GitHub Desktop.
A utility for parsing structured logs (NDPDA)
#!/usr/bin/env python3
'''Parse logs in structures with the help of a pushdown automata.
Grammar for the sample log file:
>This line is used for exact match.
$This line is used for regexp match.
This line is ignored.
The following lines marked a repeated region.
!repeat
$Match the field you like: (\d+)
!endrepeat
'''
from contextlib import suppress
import weakref
import types
import re
__all__ = ['LogMata']
def numerize(val):
for func in [int, float]:
with suppress(ValueError):
return func(val)
return val
class NumerizedMatch:
def __init__(self, m):
self.m = m
def __str__(self):
return repr(self.m)
def __repr__(self):
return '<Numerized: %s>' % str(self)
def group(self, *args, **kwargs):
ret = self.m.group(*args, **kwargs)
if isinstance(ret, tuple):
return tuple(numerize(i) for i in ret)
return numerize(ret)
def groups(self, *args, **kwargs):
ret = self.m.groups(*args, **kwargs)
return tuple(numerize(i) for i in ret)
class Node:
def __init__(self):
self.nextmove = []
def dfs_match(self, line):
for edge in self.nextmove:
if edge.is_free:
target = edge.target()
acts = [edge.action] if edge.action else []
for edge2, acts2 in target.dfs_match(line):
yield edge2, acts + acts2
elif edge.match(line):
yield edge, []
@property
def is_terminal(self):
return not self.nextmove
def dfs_terminal(self):
if self.is_terminal:
return True
return any(edge.target().dfs_terminal() \
for edge in self.nextmove if edge.is_free)
class Edge:
def __init__(self, target, pattern=None, action=None):
self.target = weakref.ref(target)
self.pattern = pattern
self.regexp = re.compile(pattern) if pattern else None
self.action = action
def match(self, line):
if self.regexp:
return self.regexp.match(line)
@property
def is_free(self):
return self.pattern is None
class NestedStack:
def __init__(self):
self.coll = [[]]
@property
def top(self):
return self.coll[-1]
@property
def bottom(self):
return self.coll[0]
@property
def levels(self):
return len(self.coll)
def addlevel(self):
x = []
self.top.append(x)
self.coll.append(x)
def droplevel(self):
self.coll.pop()
class LogMata:
def add_node(self):
ret = Node()
self.nodes.append(ret)
return ret
def add_edge(self, src, target=None, **kwargs):
target = target or self.add_node()
ret = Edge(target, **kwargs)
src.nextmove.append(ret)
return target
@classmethod
def splitlines(self, lines):
if isinstance(lines, str):
lines = lines.splitlines()
return (ln.rstrip() for ln in lines)
def __init__(self):
self.nodes = []
self.root = self.add_node()
def parse_sample(self, lines):
iterlines = self.splitlines(lines)
def innerparse(cur, exitcmd):
while True:
line = next(iterlines, None)
if line is None:
assert not exitcmd
break
elif line.startswith('>'):
cur = self.add_edge(cur, pattern=re.escape(line[1:]))
elif line.startswith('$'):
cur = self.add_edge(cur, pattern=line[1:])
elif line.startswith('!'):
if line == '!repeat':
prev = self.add_edge(cur, action='loopentry')
cur = innerparse(prev, '!endrepeat')
self.add_edge(cur, prev, action='looping')
cur = self.add_edge(cur, action='loopexit')
else:
assert line == exitcmd
break
else:
assert line.startswith(' ') or not line
return cur
innerparse(self.root, None)
def parse_log(self, lines):
iterlines = self.splitlines(lines)
curnode = self.root
parsestack = NestedStack()
for line in iterlines:
matchlist = list(curnode.dfs_match(line))
if not matchlist:
continue
((edge, acts),) = matchlist
for action in acts:
if action == 'loopentry':
parsestack.addlevel()
parsestack.addlevel()
elif action == 'looping':
parsestack.droplevel()
parsestack.addlevel()
elif action == 'loopexit':
parsestack.droplevel()
parsestack.droplevel()
else:
raise Exception()
parsestack.top.append(NumerizedMatch(edge.match(line)))
curnode = edge.target()
if curnode.is_terminal:
break
assert curnode.dfs_terminal()
return parsestack.bottom
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment