Skip to content

Instantly share code, notes, and snippets.

@adamnew123456
Created September 23, 2016 00:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adamnew123456/5be63308d3f11e7c94dba363a22d12d5 to your computer and use it in GitHub Desktop.
Save adamnew123456/5be63308d3f11e7c94dba363a22d12d5 to your computer and use it in GitHub Desktop.
Simple Regex Matcher via Thompson NDFA
"""
An NDFA-based regular expression evaluator, built in the style of the Thompson NFA.
"""
from collections import deque, namedtuple
# These are both singletons indicating:
#
# - A split in the NDFA. These never read any characters, so the matcher has to
# evaluate every side of the split in order to continue.
# - The end of the NDFA.
# - Any character matches (the dot metacharacter)
Any = object()
Split = object()
End = object()
class TransitionLogger:
"""
Logs a traversal through the NDFA during any given match. Useful for debugging purposes.
Network is retrieved via State._network(), and maps each state to a
human-readable unique identifier.
"""
def __init__(self, network):
self.network = network
self.steps = 0
self.max_possible_states = 0
def start(self, string):
"""
Initializes the logger with the string that is currently being processed.
"""
self.string = string
def log(self, state_indexes):
"""
Gives a human-readable summary of the current state, by showing the
current alternative states, and how much of the input they've consumed.
"""
self.steps += 1
self.max_possible_states = max(self.max_possible_states, len(state_indexes))
print("*" * 10, len(state_indexes))
for state, idx in state_indexes:
rest = self.string[idx:]
state_id = self.network[state]
if state.char is End:
print('@End[#{}]: "{}"'.format(state_id, rest))
elif state.char is Split:
print('@Split[#{}]: "{}"'.format(state_id, rest))
elif state.char is Any:
print('@Any[#{}]: "{}"'.format(state_id, rest[1:]))
else:
print('"{}" ---> #{}: "{}"'.format(state.char, state_id, rest[1:]))
def end(self):
"""
Dumps statistics about the run.
"""
print('Completed in', self.steps, 'steps using at most',
self.max_possible_states, 'simultaneous states')
self.steps = 0
self.max_possible_states = 0
class NullLogger:
"""
A kind of TransitionLogger that doesn't log anything.
"""
def start(self, string):
pass
def log(self, state_indexes):
pass
def end(self):
pass
class State:
"""
A state is a place on the NDFA that indicates reading a character and passing
control onto its 'children' (although we're dealing with graphs, not trees, so
they're not children in the strict sense).
A character can also be End, indicating a terminating state, or Split, indicating
that this state doesn't consume any input.
"""
def __init__(self, char, outputs):
self.char = char
self.outputs = outputs
def _network(self):
"""
This builds up a map for each state in the NDFA to its "name". This is
used for logging and debugging purposes, to help create a string
representation that is readable.
"""
network = {}
to_explore = [self]
while to_explore:
state = to_explore.pop()
if state not in network:
network[state] = len(network)
to_explore += state.outputs
return network
def __str__(self):
network = self._network()
lines = []
for state, key in network.items():
if state.char is Split:
lines.append('e ---> #{}'.format(key))
elif state.char is End:
lines.append('"" ---> #{}'.format(key))
elif state.char is Any:
lines.append('. ---> #{}'.format(key))
else:
lines.append('"{}" ---> #{}'.format(state.char, key))
for output in state.outputs:
lines.append(' #{}'.format(network[output]))
lines.append('')
return '\n'.join(lines)
class Fragment:
"""
This is a partially built NDFA, which is essentially a box with a starting
state (where you go when you enter it) and several output references.
Output references are a list of tuples (state, idx) that refer to the states
which point "outside" our fragment's box; the regex compiler has to come and
link those states to the fragment after this one (via patch()). The C
version would use two-star pointers, but since we don't have those, we have
to simulate them.
"""
def __init__(self, start, out_refs):
self.start = start
self.out_refs = out_refs
def patch(self, next_state):
"""
This updates all the outputs of this fragment to point to the given
state.
"""
for state, out_idx in self.out_refs:
state.outputs[out_idx] = next_state
def parse(regex):
"""
Parses out a regular expression into the standard regex forms.
"""
def _parse(stream):
#
# This uses a mix of a recursive and stack-based parser - we use recursion
# to handle pairs of parenthesis, while everything else is stack based
#
# Note that alternatives are put at the 'top level' - since they bind
# looser than everything else, we have to process everything else before
# them, which we do by putting each alternative (i.e. its operand stack)
# into its own bucket, and stiching them together later
operands = []
alternatives = []
escaped = False
while stream:
char = stream.popleft()
if escaped:
state = State(char, [None])
frag = Fragment(state, [(state, 0)])
operands.append(frag)
escaped = False
elif char == '\\':
escaped = True
elif char == '.':
state = State(Any, [None])
frag = Fragment(state, [(state, 0)])
operands.append(frag)
elif char == ')':
# The outer call needs this to verify that there isn't a missing
# ) from the input. We could get away with inferring missing )'s
# but no other regex engine does that.
stream.appendleft(')')
break
elif char == '(':
frag = _parse(stream)
try:
close = stream.popleft()
except IndexError:
raise SyntaxError('( expects matching )')
operands.append(frag)
elif char == '|':
alternatives.append(operands)
operands = []
elif char == '?':
# /---> a --->
# o
# \---------->
try:
arg = operands.pop()
except IndexError:
raise SyntaxError('? requires an argument')
branch = State(Split, [arg.start, None])
frag = Fragment(branch, arg.out_refs + [(branch, 1)])
operands.append(frag)
elif char == '*':
# /---> a
# o <----/
# \------------------>
try:
arg = operands.pop()
except IndexError:
raise SyntaxError('* requires an argument')
branch = State(Split, [arg.start, None])
arg.patch(branch)
frag = Fragment(branch, [(branch, 1)])
operands.append(frag)
elif char == '+':
# /---> a ---> o --->
# o <-----------/
try:
arg = operands.pop()
except IndexError:
raise SyntaxError('+ requires an argument')
branch = State(Split, [arg.start, None])
arg.patch(branch)
frag = Fragment(arg.start, [(branch, 1)])
operands.append(frag)
else:
state = State(char, [None])
frag = Fragment(state, [(state, 0)])
operands.append(frag)
if escaped:
raise SyntaxError('\\ must not be given at end of regex')
# First, work our way through each fragment within each alternative, and
# concatenate them
alternatives.append(operands)
alt_frags = []
for alt_operands in alternatives:
if not alt_operands:
start = State(Split, [None])
frag = Fragment(start, [(start, 0)])
else:
frag = alt_operands.pop()
while alt_operands:
prefix = alt_operands.pop()
prefix.patch(frag.start)
# We want to preserve the final fragment's output
# references, which is why we have to make a new fragment
# each time instead of carrying over the prefix
frag = Fragment(prefix.start, frag.out_refs)
alt_frags.append(frag)
# With each alternative concatenated, we can put a split in front of
# them to finish off the regex
if len(alt_frags) == 1:
return alt_frags[0]
else:
split = State(Split, [])
loose_outs = []
for frag in alt_frags:
loose_outs += frag.out_refs
split.outputs.append(frag.start)
split_frag = Fragment(split, loose_outs)
return split_frag
stream = deque(regex)
frag = _parse(stream)
end = State(End, [])
frag.patch(end)
return frag.start
def match(regex, string, logger=NullLogger()):
"""
Attempts to match the given regular expression against the entirty of the
given string.
"""
# The use of a set is important here - there are cases where the NDFA can
# loop through several states multiple times, without changing its index
# (a?a?a?a?a?aaaaa). In those cases, we can get large space savings by
# storing each duplicate state only once.
logger.start(string)
states = {(regex, 0)}
while states:
new_states = set()
logger.log(states)
for state, index in states:
if state.char is End:
# Only accept an End if we're actually at the end
if index == len(string):
logger.end()
return True
else:
continue
elif state.char is Split:
# We can always traverse a Split, since they never consume any
# input
for output in state.outputs:
new_state = (output, index)
new_states.add(new_state)
else:
try:
char = string[index]
except IndexError:
continue
if state.char is Any or state.char == char:
for output in state.outputs:
new_state = (output, index + 1)
new_states.add(new_state)
states = new_states
logger.end()
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment