Created
September 23, 2016 00:28
-
-
Save adamnew123456/5be63308d3f11e7c94dba363a22d12d5 to your computer and use it in GitHub Desktop.
Simple Regex Matcher via Thompson NDFA
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
An NDFA-based regular expression evaluator, built in the style of the Thompson NFA. | |
""" | |
from collections import deque, namedtuple | |
# These are both singletons indicating: | |
# | |
# - A split in the NDFA. These never read any characters, so the matcher has to | |
# evaluate every side of the split in order to continue. | |
# - The end of the NDFA. | |
# - Any character matches (the dot metacharacter) | |
Any = object() | |
Split = object() | |
End = object() | |
class TransitionLogger: | |
""" | |
Logs a traversal through the NDFA during any given match. Useful for debugging purposes. | |
Network is retrieved via State._network(), and maps each state to a | |
human-readable unique identifier. | |
""" | |
def __init__(self, network): | |
self.network = network | |
self.steps = 0 | |
self.max_possible_states = 0 | |
def start(self, string): | |
""" | |
Initializes the logger with the string that is currently being processed. | |
""" | |
self.string = string | |
def log(self, state_indexes): | |
""" | |
Gives a human-readable summary of the current state, by showing the | |
current alternative states, and how much of the input they've consumed. | |
""" | |
self.steps += 1 | |
self.max_possible_states = max(self.max_possible_states, len(state_indexes)) | |
print("*" * 10, len(state_indexes)) | |
for state, idx in state_indexes: | |
rest = self.string[idx:] | |
state_id = self.network[state] | |
if state.char is End: | |
print('@End[#{}]: "{}"'.format(state_id, rest)) | |
elif state.char is Split: | |
print('@Split[#{}]: "{}"'.format(state_id, rest)) | |
elif state.char is Any: | |
print('@Any[#{}]: "{}"'.format(state_id, rest[1:])) | |
else: | |
print('"{}" ---> #{}: "{}"'.format(state.char, state_id, rest[1:])) | |
def end(self): | |
""" | |
Dumps statistics about the run. | |
""" | |
print('Completed in', self.steps, 'steps using at most', | |
self.max_possible_states, 'simultaneous states') | |
self.steps = 0 | |
self.max_possible_states = 0 | |
class NullLogger: | |
""" | |
A kind of TransitionLogger that doesn't log anything. | |
""" | |
def start(self, string): | |
pass | |
def log(self, state_indexes): | |
pass | |
def end(self): | |
pass | |
class State: | |
""" | |
A state is a place on the NDFA that indicates reading a character and passing | |
control onto its 'children' (although we're dealing with graphs, not trees, so | |
they're not children in the strict sense). | |
A character can also be End, indicating a terminating state, or Split, indicating | |
that this state doesn't consume any input. | |
""" | |
def __init__(self, char, outputs): | |
self.char = char | |
self.outputs = outputs | |
def _network(self): | |
""" | |
This builds up a map for each state in the NDFA to its "name". This is | |
used for logging and debugging purposes, to help create a string | |
representation that is readable. | |
""" | |
network = {} | |
to_explore = [self] | |
while to_explore: | |
state = to_explore.pop() | |
if state not in network: | |
network[state] = len(network) | |
to_explore += state.outputs | |
return network | |
def __str__(self): | |
network = self._network() | |
lines = [] | |
for state, key in network.items(): | |
if state.char is Split: | |
lines.append('e ---> #{}'.format(key)) | |
elif state.char is End: | |
lines.append('"" ---> #{}'.format(key)) | |
elif state.char is Any: | |
lines.append('. ---> #{}'.format(key)) | |
else: | |
lines.append('"{}" ---> #{}'.format(state.char, key)) | |
for output in state.outputs: | |
lines.append(' #{}'.format(network[output])) | |
lines.append('') | |
return '\n'.join(lines) | |
class Fragment: | |
""" | |
This is a partially built NDFA, which is essentially a box with a starting | |
state (where you go when you enter it) and several output references. | |
Output references are a list of tuples (state, idx) that refer to the states | |
which point "outside" our fragment's box; the regex compiler has to come and | |
link those states to the fragment after this one (via patch()). The C | |
version would use two-star pointers, but since we don't have those, we have | |
to simulate them. | |
""" | |
def __init__(self, start, out_refs): | |
self.start = start | |
self.out_refs = out_refs | |
def patch(self, next_state): | |
""" | |
This updates all the outputs of this fragment to point to the given | |
state. | |
""" | |
for state, out_idx in self.out_refs: | |
state.outputs[out_idx] = next_state | |
def parse(regex): | |
""" | |
Parses out a regular expression into the standard regex forms. | |
""" | |
def _parse(stream): | |
# | |
# This uses a mix of a recursive and stack-based parser - we use recursion | |
# to handle pairs of parenthesis, while everything else is stack based | |
# | |
# Note that alternatives are put at the 'top level' - since they bind | |
# looser than everything else, we have to process everything else before | |
# them, which we do by putting each alternative (i.e. its operand stack) | |
# into its own bucket, and stiching them together later | |
operands = [] | |
alternatives = [] | |
escaped = False | |
while stream: | |
char = stream.popleft() | |
if escaped: | |
state = State(char, [None]) | |
frag = Fragment(state, [(state, 0)]) | |
operands.append(frag) | |
escaped = False | |
elif char == '\\': | |
escaped = True | |
elif char == '.': | |
state = State(Any, [None]) | |
frag = Fragment(state, [(state, 0)]) | |
operands.append(frag) | |
elif char == ')': | |
# The outer call needs this to verify that there isn't a missing | |
# ) from the input. We could get away with inferring missing )'s | |
# but no other regex engine does that. | |
stream.appendleft(')') | |
break | |
elif char == '(': | |
frag = _parse(stream) | |
try: | |
close = stream.popleft() | |
except IndexError: | |
raise SyntaxError('( expects matching )') | |
operands.append(frag) | |
elif char == '|': | |
alternatives.append(operands) | |
operands = [] | |
elif char == '?': | |
# /---> a ---> | |
# o | |
# \----------> | |
try: | |
arg = operands.pop() | |
except IndexError: | |
raise SyntaxError('? requires an argument') | |
branch = State(Split, [arg.start, None]) | |
frag = Fragment(branch, arg.out_refs + [(branch, 1)]) | |
operands.append(frag) | |
elif char == '*': | |
# /---> a | |
# o <----/ | |
# \------------------> | |
try: | |
arg = operands.pop() | |
except IndexError: | |
raise SyntaxError('* requires an argument') | |
branch = State(Split, [arg.start, None]) | |
arg.patch(branch) | |
frag = Fragment(branch, [(branch, 1)]) | |
operands.append(frag) | |
elif char == '+': | |
# /---> a ---> o ---> | |
# o <-----------/ | |
try: | |
arg = operands.pop() | |
except IndexError: | |
raise SyntaxError('+ requires an argument') | |
branch = State(Split, [arg.start, None]) | |
arg.patch(branch) | |
frag = Fragment(arg.start, [(branch, 1)]) | |
operands.append(frag) | |
else: | |
state = State(char, [None]) | |
frag = Fragment(state, [(state, 0)]) | |
operands.append(frag) | |
if escaped: | |
raise SyntaxError('\\ must not be given at end of regex') | |
# First, work our way through each fragment within each alternative, and | |
# concatenate them | |
alternatives.append(operands) | |
alt_frags = [] | |
for alt_operands in alternatives: | |
if not alt_operands: | |
start = State(Split, [None]) | |
frag = Fragment(start, [(start, 0)]) | |
else: | |
frag = alt_operands.pop() | |
while alt_operands: | |
prefix = alt_operands.pop() | |
prefix.patch(frag.start) | |
# We want to preserve the final fragment's output | |
# references, which is why we have to make a new fragment | |
# each time instead of carrying over the prefix | |
frag = Fragment(prefix.start, frag.out_refs) | |
alt_frags.append(frag) | |
# With each alternative concatenated, we can put a split in front of | |
# them to finish off the regex | |
if len(alt_frags) == 1: | |
return alt_frags[0] | |
else: | |
split = State(Split, []) | |
loose_outs = [] | |
for frag in alt_frags: | |
loose_outs += frag.out_refs | |
split.outputs.append(frag.start) | |
split_frag = Fragment(split, loose_outs) | |
return split_frag | |
stream = deque(regex) | |
frag = _parse(stream) | |
end = State(End, []) | |
frag.patch(end) | |
return frag.start | |
def match(regex, string, logger=NullLogger()): | |
""" | |
Attempts to match the given regular expression against the entirty of the | |
given string. | |
""" | |
# The use of a set is important here - there are cases where the NDFA can | |
# loop through several states multiple times, without changing its index | |
# (a?a?a?a?a?aaaaa). In those cases, we can get large space savings by | |
# storing each duplicate state only once. | |
logger.start(string) | |
states = {(regex, 0)} | |
while states: | |
new_states = set() | |
logger.log(states) | |
for state, index in states: | |
if state.char is End: | |
# Only accept an End if we're actually at the end | |
if index == len(string): | |
logger.end() | |
return True | |
else: | |
continue | |
elif state.char is Split: | |
# We can always traverse a Split, since they never consume any | |
# input | |
for output in state.outputs: | |
new_state = (output, index) | |
new_states.add(new_state) | |
else: | |
try: | |
char = string[index] | |
except IndexError: | |
continue | |
if state.char is Any or state.char == char: | |
for output in state.outputs: | |
new_state = (output, index + 1) | |
new_states.add(new_state) | |
states = new_states | |
logger.end() | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment