Skip to content

Instantly share code, notes, and snippets.

@mara004
Last active June 25, 2024 21:23
Show Gist options
  • Save mara004/0ef12eaa154502f25de2aee94e82f84b to your computer and use it in GitHub Desktop.
Save mara004/0ef12eaa154502f25de2aee94e82f84b to your computer and use it in GitHub Desktop.
Page number spec parser [Draft]
# SPDX-FileCopyrightText: 2024 geisserml <geisserml@gmail.com>
# SPDX-License-Identifier: MPL-2.0
# Sophisticated parser for a page number specification mini-language
# Technically, this might be a use case for a parser generator like pyparsing or PLY, but this is a manual implementation based on common string operations.
__all__ = ["parse_pagenums"]
import enum
import logging
import builtins
import functools
from collections import namedtuple
from contextlib import contextmanager
logger = logging.getLogger(__name__)
# Sentinel to indicate absence of a value when None is ambiguous
# Derived from attrs.NOTHING: https://github.com/python-attrs/attrs/blob/b393d79aafbf4ee58740f19dd4ad28ed29f3205c/src/attr/_make.py#L56-L79
class _Void (enum.Enum):
VOID = enum.auto()
def __repr__(self):
return "VOID"
def __bool__(self):
return False
VOID = _Void.VOID
# -- Tokens --
# Tokens currently must be non-alphanumeric, single characters, as we are using a simple loop and set for token partitioning.
# If an advanced syntax were desired, it might be possible to use regular expressions and re.finditer() in partition_by_tokens().
OPEN = "("
CLOSE = ")"
SEP = ","
REL = "="
DASH = "-"
STEP = "@"
EXCLUDE = "/"
MULTIPLEX = "#"
REVERSE = "~"
EVAL = "!"
# -- Token groups --
# The time complexity of a char set should be virtually equivalent to a direct ord() lookup table.
# Note that EVAL is handled specially and thus not contained in OPERATORS.
BRACKETS = {OPEN, CLOSE}
OPERATORS = {REL, DASH, STEP, EXCLUDE, MULTIPLEX, REVERSE}
# -- Variables --
# begin is always 1, so not much point adding a variable
END = "end"
ODD = "odd"
EVEN = "even"
ALL = "all"
class Token (str):
def __repr__(self):
return "t" + super().__repr__()
def partition_by_tokens(text, tset):
parts = []
cursor = 0
for i, c in enumerate(text):
if c in tset:
parts.append(text[cursor:i])
parts.append(Token(c))
cursor = i+1
parts.append(text[cursor:])
return [p for p in parts if p != ""]
class Group (list):
def __repr__(self):
return f"G({list.__repr__(self)[1:-1]})"
def __str__(self):
return OPEN + "".join(str(p) for p in self) + CLOSE
def shallow_split(self, symbol):
partitioned = []
for item in self:
if isinstance(item, str):
partitioned.extend(partition_by_tokens(item, {symbol}))
else: # isinstance(item, Group)
partitioned.append(item)
logger.debug(f"Tokenized {symbol!r}:\n{' '*4}{partitioned}")
wrapped = []
cursor = 0
sym_indices = (i for i, v in enumerate(partitioned) if isinstance(v, Token))
for i in sym_indices:
wrapped.append(partitioned[cursor:i])
cursor = i+1
wrapped.append(partitioned[cursor:])
return wrapped
class _RawBracketError (ValueError):
def __init__(self, _, err_stack):
self.err_stack = err_stack
super().__init__() # f"Raw bracket error: {err_stack}"
class BracketError (ValueError):
def __init__(self, parts, err_stack):
highlight = ""
for group in err_stack:
highlight += " "*len(str(group)[1:-1]) + "^"
super().__init__(f"Unmatched bracket(s):\n{''.join(parts)}\n{highlight}")
class GroupMaker:
def __init__(self, parts, exc_type=BracketError):
self.parts = parts
self.exc_type = exc_type
self.group = Group()
self.stack = []
def compile(self):
for i, p in enumerate(self.parts):
if isinstance(p, Token):
self.TOKEN_TABLE[p](self, i)
else:
self.group.append(p)
# check against unmatched open brackets
if len(self.stack) > 0:
raise self.exc_type(self.parts, self.stack)
return self.group
def on_open(self, _):
# push current group to stack and open a new group
self.stack.append(self.group)
self.group = Group()
def on_close(self, i):
# check against unmatched closing brackets
if not len(self.stack) > 0:
# recurse over the remaining part to show all errors at once
# use two separate exception types to calculate highlighting only once for the caller-facing exception
err_stack = [self.group]
try:
GroupMaker(self.parts[i+1:], exc_type=_RawBracketError).compile()
except _RawBracketError as e:
err_stack.extend(e.err_stack)
raise self.exc_type(self.parts, err_stack)
# merge current group into nearest group on stack and pop
self.stack[-1].append(self.group)
self.group = self.stack.pop()
TOKEN_TABLE = {
Token(OPEN): on_open,
Token(CLOSE): on_close,
}
class OpPart (list):
def __repr__(self):
return "p" + super().__repr__()
class ResolvedPart (list):
def __repr__(self):
return "r" + super().__repr__()
def _tokenize_part(p):
return partition_by_tokens(p.replace(" ", ""), OPERATORS)
def tokenize(root_group):
splitted = root_group.shallow_split(SEP)
logger.debug(f"Shallow splitted:\n{' '*4}{splitted}")
meta_parts = []
for sep_portion in splitted:
part = []
for i, p in enumerate(sep_portion):
if isinstance(p, str):
if EVAL in p:
a, b = p.split(EVAL, maxsplit=1)
part.extend( [*_tokenize_part(a), Token(EVAL), "".join(str(x) for x in (b, *sep_portion[i+1:]))] )
break
part.extend(_tokenize_part(p))
else: # isinstance(p, Group)
part.append(p)
if len(part) == 1:
meta_parts.append(part[0])
else:
meta_parts.append( OpPart(part) )
# tolerate trailing or multiple commas
return [p for p in meta_parts if p != []]
def _apply_excludes(base, excludes):
not_found = []
for v in excludes:
# note, in case of multiple occurrences, this removes the leftmost item
base.remove(v) if v in base else not_found.append(v)
if len(not_found) > 0:
raise ValueError(f"Unfound excludes: {not_found}")
def _wrap_to_list(item):
if not isinstance(item, list):
return [item]
return item
OpInfo = namedtuple("OpInfo", ("prio", "func", "lraw", "rraw"), defaults=(False, False))
class OperatorHandler:
def __init__(self, variables, ref_translator, recursive_pnp, eval_allowed):
self.variables = variables
self.ref_translator = ref_translator
self.recursive_pnp = recursive_pnp
self.ctx = None # initial
eval_impl = OperatorHandler.eval_enabled if eval_allowed else OperatorHandler.eval_disabled
self.OP_TABLE[Token(EVAL)] = OpInfo(1, eval_impl, rraw=True)
def __call__(self, token_data):
output = []
for part in token_data:
parsed = self.PART_TABLE[type(part)](self, part)
if isinstance(parsed, list):
output.extend(parsed)
else:
output.append(parsed)
return output
def parse_op_part(self, part):
prios = [self.OP_TABLE[x] if isinstance(x, Token) else None for x in part]
return self._apply_operators(part, prios)
def _apply_operators(self, part, prios):
if len(part) < 2: # primitive
value, = part # unpack
return self.PRIMITIVES_TABLE[type(value)](self, value)
else:
i, info = max(enumerate(prios), key=lambda x: 0 if x[1] is None else x[1].prio)
left = self._apply_or_void(part[:i], prios[:i], info.lraw)
right = self._apply_or_void(part[i+1:], prios[i+1:], info.rraw)
return info.func(self, left, right)
def _apply_or_void(self, part, prios, raw):
if not part:
return VOID
if raw:
return part
return self._apply_operators(part, prios)
def parse_str(self, value):
if value.isdigit():
return int(value)
else:
var = self.variables[self.ctx][value.lower()]
if isinstance(var, range):
var = list(var)
return var
def parse_group(self, group):
token_data = tokenize(group)
logger.debug(f"Tokenized Operators:\n{' '*4}{token_data}")
return self(token_data)
@contextmanager
def rel_context(self, tmp_ctx):
orig_ctx = self.ctx
self.ctx = tmp_ctx
try:
yield
finally:
self.ctx = orig_ctx
def on_rel(self, raw_left, raw_right):
assert self.ctx is None, "Nested relativity is not meaningful"
leading_ops, label = raw_left[:-1], raw_left[-1]
with self.rel_context(label):
value = _wrap_to_list( self.parse_op_part(raw_right) )
output = [self.ref_translator[label][n-1] for n in value]
if leading_ops:
rc_part = [*leading_ops, ResolvedPart(output)]
return self.parse_op_part(rc_part)
else:
return output
def on_exclude(self, base, excludes):
excludes = _wrap_to_list(excludes)
_apply_excludes(base, excludes) # modifies base in place
return base
def on_reverse(self, _, value):
assert _ is VOID
return value[::-1] # or list(reversed(value))
def on_multiplex(self, value, count):
if isinstance(value, list):
return value * count
else:
return [value] * count
def on_step(self, value, step):
return value[::step]
def on_dash(self, left, right):
if left is VOID:
stop = self.variables[self.ctx][END] + 1
if isinstance(right, list):
return [stop - n for n in right]
else:
return stop - right
else:
if left < right:
return list( range(left, right+1) )
else:
return list( range(left, right-1, -1) )
@staticmethod
@functools.lru_cache(maxsize=1)
def _eval_builtins():
blacklist = ("open", "input", "eval", "exec", "compile", "globals", "locals", "getattr", "exit", "quit")
data = vars(builtins).copy()
for k in blacklist:
del data[k]
return data
def eval_enabled(self, _, raw_part):
# Beware: The eval node should not be exposed to untrusted input (e.g. on a web server). It is expected to be called only by a non-malicious, local end user.
# Note that input checking and builtins restriction do not provide any actual saftey - there are ways to circumvent this. It merely prevents the most blatant shenanigans.
assert _ is VOID
value, = raw_part # unpack
if "__" in value:
raise ValueError("Double underscore not allowed in eval input.")
namespace = {**self.variables[self.ctx], "V": self.variables, "R": self.ref_translator, "pnp": self.recursive_pnp, "__builtins__": self._eval_builtins()}
out = eval(value, namespace)
if isinstance(out, (tuple, range)):
out = list(out)
return out
def eval_disabled(self, _, raw_part):
raise ValueError("Eval node is disabled.")
PRIMITIVES_TABLE = {
str: parse_str,
Group: parse_group,
ResolvedPart: lambda h, v: v, # passthrough
}
PART_TABLE = PRIMITIVES_TABLE.copy()
PART_TABLE[OpPart] = parse_op_part
OP_TABLE = {
Token(REL): OpInfo(7, on_rel, lraw=True, rraw=True),
Token(EXCLUDE): OpInfo(6, on_exclude),
Token(REVERSE): OpInfo(5, on_reverse),
Token(STEP): OpInfo(4, on_step),
Token(MULTIPLEX): OpInfo(3, on_multiplex),
Token(DASH): OpInfo(2, on_dash),
# EVAL is handled during init
}
def validate(page_indices, variables):
pass # TODO
def get_variables(base_length, ref_translator):
variables = {None: {}}
docs = [(None, base_length)] if base_length != None else []
docs += [(k, len(v)) for k, v in ref_translator.items()]
for k, length in docs:
stop = length+1
variables[k] = {END: length, ODD: range(1, stop, 2), EVEN: range(2, stop, 2), ALL: range(1, stop)}
return variables
def parse_pagenums(string, doclen=None, ref_translator=None, eval_allowed=False):
if ref_translator is None:
ref_translator = {}
variables = get_variables(doclen, ref_translator)
logger.debug(f"Input:\n{' '*4}{string!r}")
bracket_parts = partition_by_tokens(string, BRACKETS)
logger.debug(f"Tokenized Brackets:\n{' '*4}{bracket_parts}")
root_group = GroupMaker(bracket_parts).compile()
logger.debug(f"Grouped by Brackets:\n{' '*4}{root_group!r}")
recursive_pnp = lambda txt: parse_pagenums(txt, doclen, ref_translator, eval_allowed)
page_indices = OperatorHandler(variables, ref_translator, recursive_pnp, eval_allowed)([root_group])
validate(page_indices, variables)
return page_indices
# testing
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
import time
doclen = 20
ref_translator = {"A": [8, 9, 10, 11, 12], "1": [3]}
def testcase(string, eval_allowed=True):
out = parse_pagenums(string, doclen, ref_translator, eval_allowed)
print(out, end="\n\n")
return out
def run_samples():
# testcase("1,2,(3,4)),(5,(6")
# testcase("A=B=1") # or "A=(B=1)"
# testcase("!-1", eval_allowed=False)
starttime = time.time()
testcase("1, 2, (3, (4, 5)), 6#2, 6-10/8, (~A=1-2)/9, 8-10/A=2, ~3-5")
testcase("-1, !-1, !8-5, A=!(1, end), 1-8/![x for x in range(2, 5)]")
testcase("end, 1, -end, -1, -odd")
testcase("end-1/(1-2), odd/1, even/2, all/(3,5-7)")
testcase("A=(all/(1, 3)), all/A=(1,3), A=all/(1,3), 1=1")
testcase("1-7@2, 7-1, , 6#4@2, 1-5#2@2, ")
duration = time.time() - starttime
print(f"Duration {duration}s")
if __name__ == "__main__":
run_samples()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment