Last active
July 8, 2024 13:11
-
-
Save mara004/0ef12eaa154502f25de2aee94e82f84b to your computer and use it in GitHub Desktop.
Page number spec parser [Draft]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-FileCopyrightText: 2024 geisserml <geisserml@gmail.com> | |
# SPDX-License-Identifier: MPL-2.0 | |
# Sophisticated parser for a page number specification mini-language | |
# Technically, this might be a use case for a parser generator like pyparsing or PLY, but this is a manual implementation based on common string operations. | |
__all__ = ["parse_pagenums"] | |
import logging | |
from collections import namedtuple | |
from contextlib import contextmanager | |
from dataclasses import MISSING as VOID | |
logger = logging.getLogger(__name__) | |
# -- Tokens -- | |
# Tokens currently must be non-alphanumeric, single characters, as we are using a simple loop and set for token partitioning. | |
# If an advanced syntax were desired, it might be possible to use regular expressions and re.finditer() in partition_by_tokens(). | |
OPEN = "(" | |
CLOSE = ")" | |
SEP = "," | |
RELATIVE = "=" | |
DASH = "-" # backward or range | |
STEP = "@" | |
EXCLUDE = "/" | |
MULTIPLEX = "#" | |
REVERSE = "~" | |
EVAL = "!" | |
# -- Token groups -- | |
# The performance of a char set contains check should be roughly equivalent to a direct ord() lookup table. | |
# Note that EVAL is handled specially and thus not contained in OPERATORS. | |
BRACKETS = {OPEN, CLOSE} | |
OPERATORS = {RELATIVE, DASH, STEP, EXCLUDE, MULTIPLEX, REVERSE} | |
# -- Variables -- | |
BEGIN = "begin" # may be > 1 in RELATIVE context | |
END = "end" | |
ODD = "odd" | |
EVEN = "even" | |
ALL = "all" | |
class Token (str): | |
def __repr__(self): | |
return "t" + super().__repr__() | |
def partition_by_tokens(text, tset): | |
parts = [] | |
cursor = 0 | |
for i, c in enumerate(text): | |
if c in tset: | |
parts.append(text[cursor:i]) | |
parts.append(Token(c)) | |
cursor = i+1 | |
parts.append(text[cursor:]) | |
return [p for p in parts if p != ""] | |
class Group (list): | |
def __repr__(self): | |
return f"G({list.__repr__(self)[1:-1]})" | |
def __str__(self): | |
return OPEN + "".join(str(p) for p in self) + CLOSE | |
def shallow_split(self, symbol): | |
partitioned = [] | |
for item in self: | |
if isinstance(item, str): | |
partitioned.extend(partition_by_tokens(item, {symbol})) | |
else: # isinstance(item, Group) | |
partitioned.append(item) | |
logger.debug(f"Tokenized {symbol!r}:\n{' '*4}{partitioned}") | |
wrapped = [] | |
cursor = 0 | |
sym_indices = (i for i, v in enumerate(partitioned) if isinstance(v, Token)) | |
for i in sym_indices: | |
wrapped.append(partitioned[cursor:i]) | |
cursor = i+1 | |
wrapped.append(partitioned[cursor:]) | |
return wrapped | |
class _RawBracketError (ValueError): | |
def __init__(self, _, err_stack): | |
self.err_stack = err_stack | |
super().__init__() # f"Raw bracket error: {err_stack}" | |
class BracketError (ValueError): | |
def __init__(self, parts, err_stack): | |
highlight = "" | |
for group in err_stack: | |
highlight += " "*len(str(group)[1:-1]) + "^" | |
super().__init__(f"Unmatched bracket(s):\n{''.join(parts)}\n{highlight}") | |
class GroupMaker: | |
def __init__(self, parts, exc_type=BracketError): | |
self.parts = parts | |
self.exc_type = exc_type | |
self.group = Group() | |
self.stack = [] | |
def compile(self): | |
for i, p in enumerate(self.parts): | |
if isinstance(p, Token): | |
self.TOKEN_TABLE[p](self, i) | |
else: | |
self.group.append(p) | |
# check against unmatched open brackets | |
if len(self.stack) > 0: | |
raise self.exc_type(self.parts, self.stack) | |
return self.group | |
def on_open(self, _): | |
# push current group to stack and open a new group | |
self.stack.append(self.group) | |
self.group = Group() | |
def on_close(self, i): | |
# check against unmatched closing brackets | |
if not len(self.stack) > 0: | |
# recurse over the remaining part to show all errors at once | |
# use two separate exception types to calculate highlighting only once for the caller-facing exception | |
err_stack = [self.group] | |
try: | |
GroupMaker(self.parts[i+1:], exc_type=_RawBracketError).compile() | |
except _RawBracketError as e: | |
err_stack.extend(e.err_stack) | |
raise self.exc_type(self.parts, err_stack) | |
# merge current group into nearest group on stack and pop | |
self.stack[-1].append(self.group) | |
self.group = self.stack.pop() | |
TOKEN_TABLE = { | |
Token(OPEN): on_open, | |
Token(CLOSE): on_close, | |
} | |
class OpPart (list): | |
def __repr__(self): | |
return "p" + super().__repr__() | |
class ResolvedPart (list): | |
def __repr__(self): | |
return "r" + super().__repr__() | |
def _tokenize_part(p): | |
return partition_by_tokens(p.replace(" ", ""), OPERATORS) | |
def tokenize(root_group): | |
splitted = root_group.shallow_split(SEP) | |
logger.debug(f"Shallow splitted:\n{' '*4}{splitted}") | |
meta_parts = [] | |
for sep_portion in splitted: | |
part = [] | |
for i, p in enumerate(sep_portion): | |
if isinstance(p, str): | |
if EVAL in p: | |
a, b = p.split(EVAL, maxsplit=1) | |
sep_remainder = sep_portion[i+1:] | |
if not b.replace(" ", "") and sep_remainder and isinstance(sep_remainder[0], Group): | |
part.extend( [*_tokenize_part(a), Token(EVAL)] ) | |
else: | |
eval_str = b + "".join(str(x) for x in sep_remainder) | |
part.extend( [*_tokenize_part(a), Token(EVAL), eval_str] ) | |
break | |
else: | |
part.extend(_tokenize_part(p)) | |
else: # isinstance(p, Group) | |
part.append(p) | |
meta_parts.append(part[0] if len(part) == 1 else OpPart(part)) | |
# tolerate trailing or multiple commas | |
return [p for p in meta_parts if p != []] | |
def _apply_excludes(base, excludes): | |
not_found = [] | |
for v in excludes: | |
# note, in case of multiple occurrences, this removes the leftmost item | |
base.remove(v) if v in base else not_found.append(v) | |
if len(not_found) > 0: | |
raise ValueError(f"Unfound excludes: {base} / {not_found}") | |
def _wrap_to_list(item): | |
if not isinstance(item, list): | |
return [item] | |
return item | |
OpInfo = namedtuple("OpInfo", ("prio", "func", "lraw", "rraw"), defaults=(False, False)) | |
class OperatorHandler: | |
def __init__(self, ref_translator, variables, recursive_pnp): | |
self.ref_translator = ref_translator | |
self.variables = variables | |
self.recursive_pnp = recursive_pnp | |
self.ctx = None # initial | |
def __call__(self, token_data): | |
out = [] | |
for part in token_data: | |
parsed = self.PART_TABLE[type(part)](self, part) | |
(out.extend if isinstance(parsed, list) else out.append)(parsed) | |
return out | |
def parse_op_part(self, part): | |
prios = [self.OP_TABLE[x] if isinstance(x, Token) else None for x in part] | |
return self._apply_operators(part, prios) | |
def _apply_operators(self, part, prios): | |
if len(part) < 2: # primitive | |
value, = part # unpack | |
return self.PRIMITIVES_TABLE[type(value)](self, value) | |
else: | |
i, info = max(enumerate(prios), key=lambda x: 0 if x[1] is None else x[1].prio) | |
left = self._apply_or_void(part[:i], prios[:i], info.lraw) | |
right = self._apply_or_void(part[i+1:], prios[i+1:], info.rraw) | |
# assert None not in (left, right), "Operator method must return non-None value" | |
return info.func(self, left, right) | |
def _apply_or_void(self, part, prios, raw): | |
if not part: | |
return VOID | |
if raw: | |
return part | |
return self._apply_operators(part, prios) | |
def parse_str(self, value): | |
if value.isdigit(): | |
return int(value) | |
else: | |
var = self.variables[self.ctx][value.lower()] | |
if isinstance(var, (range, tuple)): | |
var = list(var) | |
return var | |
def parse_group(self, group): | |
token_data = tokenize(group) | |
logger.debug(f"Tokenized Operators:\n{' '*4}{token_data}") | |
return self(token_data) | |
@contextmanager | |
def relative_ctx(self, tmp_ctx): | |
orig_ctx, self.ctx = self.ctx, tmp_ctx | |
try: | |
yield | |
finally: | |
self.ctx = orig_ctx | |
def on_relative(self, raw_left, raw_right): | |
assert self.ctx is None, "Nested relativity is not meaningful" | |
leading_ops, label = raw_left[:-1], raw_left[-1] | |
with self.relative_ctx(label): | |
value = _wrap_to_list( self.parse_op_part(raw_right) ) | |
output = [self.ref_translator[label][n] for n in value] | |
if leading_ops: | |
return self.parse_op_part( [*leading_ops, ResolvedPart(output)] ) | |
else: | |
return output | |
def on_exclude(self, base, excludes): | |
base = base.copy() # make a copy to be on the safe side | |
excludes = _wrap_to_list(excludes) | |
_apply_excludes(base, excludes) # modifies base in place | |
return base | |
def on_reverse(self, _, value): | |
assert _ is VOID | |
return value[::-1] # or list(reversed(value)) | |
def on_multiplex(self, value, count): | |
if isinstance(value, list): | |
return value * count | |
else: | |
return [value] * count | |
def on_step(self, value, step): | |
return value[::step] | |
def on_dash(self, left, right): | |
if left is VOID: | |
stop = self.variables[self.ctx][END] + 1 | |
if isinstance(right, list): | |
return [stop - n for n in right] | |
else: | |
return stop - right | |
else: | |
if left < right: | |
return list( range(left, right+1) ) | |
else: | |
return list( range(left, right-1, -1) ) | |
def on_eval(self, _, raw_part): | |
assert _ is VOID | |
value, = raw_part # unpack | |
value = str(value) # may be a Group | |
namespace = {**self.variables[self.ctx], "V": self.variables, "R": self.ref_translator, "pnp": self.recursive_pnp} | |
out = eval(value, namespace) | |
if isinstance(out, (tuple, range)): | |
out = list(out) | |
return out | |
PRIMITIVES_TABLE = { | |
str: parse_str, | |
Group: parse_group, | |
ResolvedPart: lambda h, v: v, # passthrough | |
} | |
PART_TABLE = PRIMITIVES_TABLE.copy() | |
PART_TABLE[OpPart] = parse_op_part | |
OP_TABLE = { | |
Token(RELATIVE): OpInfo(7, on_relative, lraw=True, rraw=True), | |
Token(EXCLUDE): OpInfo(6, on_exclude), | |
Token(REVERSE): OpInfo(5, on_reverse), | |
Token(STEP): OpInfo(4, on_step), | |
Token(MULTIPLEX): OpInfo(3, on_multiplex), | |
Token(DASH): OpInfo(2, on_dash), | |
Token(EVAL): OpInfo(1, on_eval, rraw=True) | |
} | |
def validate(page_indices, variables): | |
pass # TODO | |
def get_variables(base_length, ref_translator): | |
variables = {} | |
stop = base_length + 1 | |
variables[None] = {BEGIN: 1, END: base_length, ODD: range(1, stop, 2), EVEN: range(2, stop, 2), ALL: range(1, stop)} | |
# NOTE this requires an ordered mapping | |
for label, mapping in ref_translator.items(): | |
odd, even = [], [] | |
for n in mapping.keys(): | |
(odd if n % 2 else even).append(n) | |
variables[label] = { | |
BEGIN: min(mapping.keys()), END: max(mapping.keys()), | |
ODD: tuple(odd), EVEN: tuple(even), ALL: tuple(mapping.keys()), | |
} | |
return variables | |
def parse_pagenums(string, ref_translator=None, variables=None): | |
logger.debug(f"Input:\n{' '*4}{string!r}") | |
bracket_parts = partition_by_tokens(string, BRACKETS) | |
logger.debug(f"Tokenized Brackets:\n{' '*4}{bracket_parts}") | |
root_group = GroupMaker(bracket_parts).compile() | |
logger.debug(f"Grouped by Brackets:\n{' '*4}{root_group!r}") | |
recursive_pnp = lambda s: parse_pagenums(s, ref_translator, variables) | |
page_indices = OperatorHandler(ref_translator, variables, recursive_pnp)([root_group]) | |
validate(page_indices, variables) | |
return page_indices | |
# testing | |
logger.addHandler(logging.StreamHandler()) | |
logger.setLevel(logging.DEBUG) | |
import time | |
t_doclen = 20 | |
t_ref_translator = {"A": {2:8, 3:9, 4:10, 5:11, 6:12, 7:13, 10:16}, "1": {1:3}} | |
t_variables = get_variables(t_doclen, t_ref_translator) | |
logger.debug("Variables:\n" + '\n'.join(f"{k}: {v}" for k, v in t_variables.items())) | |
def testcase(string): | |
out = parse_pagenums(string, t_ref_translator, t_variables) | |
print(out, end="\n\n") | |
return out | |
def run_samples(): | |
# testcase("1,2,(3,4)),(5,(6") | |
# testcase("A=B=1") # or "A=(B=1)" | |
starttime = time.time() | |
# NOTE the test samples aren't particularly meaningful or creative at this time, they're just to traverse the code and test parser functionality | |
testcase("1, 2, (3, (4, 5)), 6#2, 6-10/8, (~A=2-3)/9, 8-10/A=2, ~3-5") | |
testcase("-1, !-1, !8-5, !(3-1)-6, A=!(begin, end), 1-8/![x for x in range(2, 5)]") | |
testcase("end, 1, -end, -1, -odd") | |
testcase("end-1/(1-2), odd/1, even/2, all/(3,5-7)") | |
testcase("A=(begin, end, even, odd, all)") | |
testcase("A=(all/(2, 4)), all/A=(2,4), A=all/(2,4), 1=1") | |
testcase("1-7@2, 7-1, , 1-5#2@2, ") | |
testcase("!pnp('begin, end'), ![v for p in zip(pnp('5-10'), pnp('10-5')) for v in p]") | |
duration = time.time() - starttime | |
print(f"Duration {duration}s") | |
if __name__ == "__main__": | |
run_samples() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment