Last active
June 25, 2024 21:23
-
-
Save mara004/0ef12eaa154502f25de2aee94e82f84b to your computer and use it in GitHub Desktop.
Page number spec parser [Draft]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-FileCopyrightText: 2024 geisserml <geisserml@gmail.com> | |
# SPDX-License-Identifier: MPL-2.0 | |
# Sophisticated parser for a page number specification mini-language | |
# Technically, this might be a use case for a parser generator like pyparsing or PLY, but this is a manual implementation based on common string operations. | |
__all__ = ["parse_pagenums"] | |
import enum | |
import logging | |
import builtins | |
import functools | |
from collections import namedtuple | |
from contextlib import contextmanager | |
logger = logging.getLogger(__name__) | |
# Sentinel to indicate absence of a value when None is ambiguous | |
# Derived from attrs.NOTHING: https://github.com/python-attrs/attrs/blob/b393d79aafbf4ee58740f19dd4ad28ed29f3205c/src/attr/_make.py#L56-L79 | |
class _Void (enum.Enum): | |
VOID = enum.auto() | |
def __repr__(self): | |
return "VOID" | |
def __bool__(self): | |
return False | |
VOID = _Void.VOID | |
# -- Tokens -- | |
# Tokens currently must be non-alphanumeric, single characters, as we are using a simple loop and set for token partitioning. | |
# If an advanced syntax were desired, it might be possible to use regular expressions and re.finditer() in partition_by_tokens(). | |
OPEN = "(" | |
CLOSE = ")" | |
SEP = "," | |
REL = "=" | |
DASH = "-" | |
STEP = "@" | |
EXCLUDE = "/" | |
MULTIPLEX = "#" | |
REVERSE = "~" | |
EVAL = "!" | |
# -- Token groups -- | |
# The time complexity of a char set should be virtually equivalent to a direct ord() lookup table. | |
# Note that EVAL is handled specially and thus not contained in OPERATORS. | |
BRACKETS = {OPEN, CLOSE} | |
OPERATORS = {REL, DASH, STEP, EXCLUDE, MULTIPLEX, REVERSE} | |
# -- Variables -- | |
# begin is always 1, so not much point adding a variable | |
END = "end" | |
ODD = "odd" | |
EVEN = "even" | |
ALL = "all" | |
class Token (str): | |
def __repr__(self): | |
return "t" + super().__repr__() | |
def partition_by_tokens(text, tset): | |
parts = [] | |
cursor = 0 | |
for i, c in enumerate(text): | |
if c in tset: | |
parts.append(text[cursor:i]) | |
parts.append(Token(c)) | |
cursor = i+1 | |
parts.append(text[cursor:]) | |
return [p for p in parts if p != ""] | |
class Group (list): | |
def __repr__(self): | |
return f"G({list.__repr__(self)[1:-1]})" | |
def __str__(self): | |
return OPEN + "".join(str(p) for p in self) + CLOSE | |
def shallow_split(self, symbol): | |
partitioned = [] | |
for item in self: | |
if isinstance(item, str): | |
partitioned.extend(partition_by_tokens(item, {symbol})) | |
else: # isinstance(item, Group) | |
partitioned.append(item) | |
logger.debug(f"Tokenized {symbol!r}:\n{' '*4}{partitioned}") | |
wrapped = [] | |
cursor = 0 | |
sym_indices = (i for i, v in enumerate(partitioned) if isinstance(v, Token)) | |
for i in sym_indices: | |
wrapped.append(partitioned[cursor:i]) | |
cursor = i+1 | |
wrapped.append(partitioned[cursor:]) | |
return wrapped | |
class _RawBracketError (ValueError): | |
def __init__(self, _, err_stack): | |
self.err_stack = err_stack | |
super().__init__() # f"Raw bracket error: {err_stack}" | |
class BracketError (ValueError): | |
def __init__(self, parts, err_stack): | |
highlight = "" | |
for group in err_stack: | |
highlight += " "*len(str(group)[1:-1]) + "^" | |
super().__init__(f"Unmatched bracket(s):\n{''.join(parts)}\n{highlight}") | |
class GroupMaker: | |
def __init__(self, parts, exc_type=BracketError): | |
self.parts = parts | |
self.exc_type = exc_type | |
self.group = Group() | |
self.stack = [] | |
def compile(self): | |
for i, p in enumerate(self.parts): | |
if isinstance(p, Token): | |
self.TOKEN_TABLE[p](self, i) | |
else: | |
self.group.append(p) | |
# check against unmatched open brackets | |
if len(self.stack) > 0: | |
raise self.exc_type(self.parts, self.stack) | |
return self.group | |
def on_open(self, _): | |
# push current group to stack and open a new group | |
self.stack.append(self.group) | |
self.group = Group() | |
def on_close(self, i): | |
# check against unmatched closing brackets | |
if not len(self.stack) > 0: | |
# recurse over the remaining part to show all errors at once | |
# use two separate exception types to calculate highlighting only once for the caller-facing exception | |
err_stack = [self.group] | |
try: | |
GroupMaker(self.parts[i+1:], exc_type=_RawBracketError).compile() | |
except _RawBracketError as e: | |
err_stack.extend(e.err_stack) | |
raise self.exc_type(self.parts, err_stack) | |
# merge current group into nearest group on stack and pop | |
self.stack[-1].append(self.group) | |
self.group = self.stack.pop() | |
TOKEN_TABLE = { | |
Token(OPEN): on_open, | |
Token(CLOSE): on_close, | |
} | |
class OpPart (list): | |
def __repr__(self): | |
return "p" + super().__repr__() | |
class ResolvedPart (list): | |
def __repr__(self): | |
return "r" + super().__repr__() | |
def _tokenize_part(p): | |
return partition_by_tokens(p.replace(" ", ""), OPERATORS) | |
def tokenize(root_group): | |
splitted = root_group.shallow_split(SEP) | |
logger.debug(f"Shallow splitted:\n{' '*4}{splitted}") | |
meta_parts = [] | |
for sep_portion in splitted: | |
part = [] | |
for i, p in enumerate(sep_portion): | |
if isinstance(p, str): | |
if EVAL in p: | |
a, b = p.split(EVAL, maxsplit=1) | |
part.extend( [*_tokenize_part(a), Token(EVAL), "".join(str(x) for x in (b, *sep_portion[i+1:]))] ) | |
break | |
part.extend(_tokenize_part(p)) | |
else: # isinstance(p, Group) | |
part.append(p) | |
if len(part) == 1: | |
meta_parts.append(part[0]) | |
else: | |
meta_parts.append( OpPart(part) ) | |
# tolerate trailing or multiple commas | |
return [p for p in meta_parts if p != []] | |
def _apply_excludes(base, excludes): | |
not_found = [] | |
for v in excludes: | |
# note, in case of multiple occurrences, this removes the leftmost item | |
base.remove(v) if v in base else not_found.append(v) | |
if len(not_found) > 0: | |
raise ValueError(f"Unfound excludes: {not_found}") | |
def _wrap_to_list(item): | |
if not isinstance(item, list): | |
return [item] | |
return item | |
OpInfo = namedtuple("OpInfo", ("prio", "func", "lraw", "rraw"), defaults=(False, False)) | |
class OperatorHandler: | |
def __init__(self, variables, ref_translator, recursive_pnp, eval_allowed): | |
self.variables = variables | |
self.ref_translator = ref_translator | |
self.recursive_pnp = recursive_pnp | |
self.ctx = None # initial | |
eval_impl = OperatorHandler.eval_enabled if eval_allowed else OperatorHandler.eval_disabled | |
self.OP_TABLE[Token(EVAL)] = OpInfo(1, eval_impl, rraw=True) | |
def __call__(self, token_data): | |
output = [] | |
for part in token_data: | |
parsed = self.PART_TABLE[type(part)](self, part) | |
if isinstance(parsed, list): | |
output.extend(parsed) | |
else: | |
output.append(parsed) | |
return output | |
def parse_op_part(self, part): | |
prios = [self.OP_TABLE[x] if isinstance(x, Token) else None for x in part] | |
return self._apply_operators(part, prios) | |
def _apply_operators(self, part, prios): | |
if len(part) < 2: # primitive | |
value, = part # unpack | |
return self.PRIMITIVES_TABLE[type(value)](self, value) | |
else: | |
i, info = max(enumerate(prios), key=lambda x: 0 if x[1] is None else x[1].prio) | |
left = self._apply_or_void(part[:i], prios[:i], info.lraw) | |
right = self._apply_or_void(part[i+1:], prios[i+1:], info.rraw) | |
return info.func(self, left, right) | |
def _apply_or_void(self, part, prios, raw): | |
if not part: | |
return VOID | |
if raw: | |
return part | |
return self._apply_operators(part, prios) | |
def parse_str(self, value): | |
if value.isdigit(): | |
return int(value) | |
else: | |
var = self.variables[self.ctx][value.lower()] | |
if isinstance(var, range): | |
var = list(var) | |
return var | |
def parse_group(self, group): | |
token_data = tokenize(group) | |
logger.debug(f"Tokenized Operators:\n{' '*4}{token_data}") | |
return self(token_data) | |
@contextmanager | |
def rel_context(self, tmp_ctx): | |
orig_ctx = self.ctx | |
self.ctx = tmp_ctx | |
try: | |
yield | |
finally: | |
self.ctx = orig_ctx | |
def on_rel(self, raw_left, raw_right): | |
assert self.ctx is None, "Nested relativity is not meaningful" | |
leading_ops, label = raw_left[:-1], raw_left[-1] | |
with self.rel_context(label): | |
value = _wrap_to_list( self.parse_op_part(raw_right) ) | |
output = [self.ref_translator[label][n-1] for n in value] | |
if leading_ops: | |
rc_part = [*leading_ops, ResolvedPart(output)] | |
return self.parse_op_part(rc_part) | |
else: | |
return output | |
def on_exclude(self, base, excludes): | |
excludes = _wrap_to_list(excludes) | |
_apply_excludes(base, excludes) # modifies base in place | |
return base | |
def on_reverse(self, _, value): | |
assert _ is VOID | |
return value[::-1] # or list(reversed(value)) | |
def on_multiplex(self, value, count): | |
if isinstance(value, list): | |
return value * count | |
else: | |
return [value] * count | |
def on_step(self, value, step): | |
return value[::step] | |
def on_dash(self, left, right): | |
if left is VOID: | |
stop = self.variables[self.ctx][END] + 1 | |
if isinstance(right, list): | |
return [stop - n for n in right] | |
else: | |
return stop - right | |
else: | |
if left < right: | |
return list( range(left, right+1) ) | |
else: | |
return list( range(left, right-1, -1) ) | |
@staticmethod | |
@functools.lru_cache(maxsize=1) | |
def _eval_builtins(): | |
blacklist = ("open", "input", "eval", "exec", "compile", "globals", "locals", "getattr", "exit", "quit") | |
data = vars(builtins).copy() | |
for k in blacklist: | |
del data[k] | |
return data | |
def eval_enabled(self, _, raw_part): | |
# Beware: The eval node should not be exposed to untrusted input (e.g. on a web server). It is expected to be called only by a non-malicious, local end user. | |
# Note that input checking and builtins restriction do not provide any actual saftey - there are ways to circumvent this. It merely prevents the most blatant shenanigans. | |
assert _ is VOID | |
value, = raw_part # unpack | |
if "__" in value: | |
raise ValueError("Double underscore not allowed in eval input.") | |
namespace = {**self.variables[self.ctx], "V": self.variables, "R": self.ref_translator, "pnp": self.recursive_pnp, "__builtins__": self._eval_builtins()} | |
out = eval(value, namespace) | |
if isinstance(out, (tuple, range)): | |
out = list(out) | |
return out | |
def eval_disabled(self, _, raw_part): | |
raise ValueError("Eval node is disabled.") | |
PRIMITIVES_TABLE = { | |
str: parse_str, | |
Group: parse_group, | |
ResolvedPart: lambda h, v: v, # passthrough | |
} | |
PART_TABLE = PRIMITIVES_TABLE.copy() | |
PART_TABLE[OpPart] = parse_op_part | |
OP_TABLE = { | |
Token(REL): OpInfo(7, on_rel, lraw=True, rraw=True), | |
Token(EXCLUDE): OpInfo(6, on_exclude), | |
Token(REVERSE): OpInfo(5, on_reverse), | |
Token(STEP): OpInfo(4, on_step), | |
Token(MULTIPLEX): OpInfo(3, on_multiplex), | |
Token(DASH): OpInfo(2, on_dash), | |
# EVAL is handled during init | |
} | |
def validate(page_indices, variables): | |
pass # TODO | |
def get_variables(base_length, ref_translator): | |
variables = {None: {}} | |
docs = [(None, base_length)] if base_length != None else [] | |
docs += [(k, len(v)) for k, v in ref_translator.items()] | |
for k, length in docs: | |
stop = length+1 | |
variables[k] = {END: length, ODD: range(1, stop, 2), EVEN: range(2, stop, 2), ALL: range(1, stop)} | |
return variables | |
def parse_pagenums(string, doclen=None, ref_translator=None, eval_allowed=False): | |
if ref_translator is None: | |
ref_translator = {} | |
variables = get_variables(doclen, ref_translator) | |
logger.debug(f"Input:\n{' '*4}{string!r}") | |
bracket_parts = partition_by_tokens(string, BRACKETS) | |
logger.debug(f"Tokenized Brackets:\n{' '*4}{bracket_parts}") | |
root_group = GroupMaker(bracket_parts).compile() | |
logger.debug(f"Grouped by Brackets:\n{' '*4}{root_group!r}") | |
recursive_pnp = lambda txt: parse_pagenums(txt, doclen, ref_translator, eval_allowed) | |
page_indices = OperatorHandler(variables, ref_translator, recursive_pnp, eval_allowed)([root_group]) | |
validate(page_indices, variables) | |
return page_indices | |
# testing | |
logger.addHandler(logging.StreamHandler()) | |
logger.setLevel(logging.DEBUG) | |
import time | |
doclen = 20 | |
ref_translator = {"A": [8, 9, 10, 11, 12], "1": [3]} | |
def testcase(string, eval_allowed=True): | |
out = parse_pagenums(string, doclen, ref_translator, eval_allowed) | |
print(out, end="\n\n") | |
return out | |
def run_samples(): | |
# testcase("1,2,(3,4)),(5,(6") | |
# testcase("A=B=1") # or "A=(B=1)" | |
# testcase("!-1", eval_allowed=False) | |
starttime = time.time() | |
testcase("1, 2, (3, (4, 5)), 6#2, 6-10/8, (~A=1-2)/9, 8-10/A=2, ~3-5") | |
testcase("-1, !-1, !8-5, A=!(1, end), 1-8/![x for x in range(2, 5)]") | |
testcase("end, 1, -end, -1, -odd") | |
testcase("end-1/(1-2), odd/1, even/2, all/(3,5-7)") | |
testcase("A=(all/(1, 3)), all/A=(1,3), A=all/(1,3), 1=1") | |
testcase("1-7@2, 7-1, , 6#4@2, 1-5#2@2, ") | |
duration = time.time() - starttime | |
print(f"Duration {duration}s") | |
if __name__ == "__main__": | |
run_samples() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment