Skip to content

Instantly share code, notes, and snippets.

@mara004
Last active July 8, 2024 13:11
Show Gist options
  • Save mara004/0ef12eaa154502f25de2aee94e82f84b to your computer and use it in GitHub Desktop.
Save mara004/0ef12eaa154502f25de2aee94e82f84b to your computer and use it in GitHub Desktop.
Page number spec parser [Draft]
# SPDX-FileCopyrightText: 2024 geisserml <geisserml@gmail.com>
# SPDX-License-Identifier: MPL-2.0
# Sophisticated parser for a page number specification mini-language
# Technically, this might be a use case for a parser generator like pyparsing or PLY, but this is a manual implementation based on common string operations.
__all__ = ["parse_pagenums"]
import logging
from collections import namedtuple
from contextlib import contextmanager
from dataclasses import MISSING as VOID
logger = logging.getLogger(__name__)
# -- Tokens --
# Tokens currently must be non-alphanumeric, single characters, as we are using a simple loop and set for token partitioning.
# If an advanced syntax were desired, it might be possible to use regular expressions and re.finditer() in partition_by_tokens().
OPEN = "("
CLOSE = ")"
SEP = ","
RELATIVE = "="
DASH = "-" # backward or range
STEP = "@"
EXCLUDE = "/"
MULTIPLEX = "#"
REVERSE = "~"
EVAL = "!"
# -- Token groups --
# The performance of a char set contains check should be roughly equivalent to a direct ord() lookup table.
# Note that EVAL is handled specially and thus not contained in OPERATORS.
BRACKETS = {OPEN, CLOSE}
OPERATORS = {RELATIVE, DASH, STEP, EXCLUDE, MULTIPLEX, REVERSE}
# -- Variables --
BEGIN = "begin" # may be > 1 in RELATIVE context
END = "end"
ODD = "odd"
EVEN = "even"
ALL = "all"
class Token (str):
def __repr__(self):
return "t" + super().__repr__()
def partition_by_tokens(text, tset):
parts = []
cursor = 0
for i, c in enumerate(text):
if c in tset:
parts.append(text[cursor:i])
parts.append(Token(c))
cursor = i+1
parts.append(text[cursor:])
return [p for p in parts if p != ""]
class Group (list):
def __repr__(self):
return f"G({list.__repr__(self)[1:-1]})"
def __str__(self):
return OPEN + "".join(str(p) for p in self) + CLOSE
def shallow_split(self, symbol):
partitioned = []
for item in self:
if isinstance(item, str):
partitioned.extend(partition_by_tokens(item, {symbol}))
else: # isinstance(item, Group)
partitioned.append(item)
logger.debug(f"Tokenized {symbol!r}:\n{' '*4}{partitioned}")
wrapped = []
cursor = 0
sym_indices = (i for i, v in enumerate(partitioned) if isinstance(v, Token))
for i in sym_indices:
wrapped.append(partitioned[cursor:i])
cursor = i+1
wrapped.append(partitioned[cursor:])
return wrapped
class _RawBracketError (ValueError):
def __init__(self, _, err_stack):
self.err_stack = err_stack
super().__init__() # f"Raw bracket error: {err_stack}"
class BracketError (ValueError):
def __init__(self, parts, err_stack):
highlight = ""
for group in err_stack:
highlight += " "*len(str(group)[1:-1]) + "^"
super().__init__(f"Unmatched bracket(s):\n{''.join(parts)}\n{highlight}")
class GroupMaker:
def __init__(self, parts, exc_type=BracketError):
self.parts = parts
self.exc_type = exc_type
self.group = Group()
self.stack = []
def compile(self):
for i, p in enumerate(self.parts):
if isinstance(p, Token):
self.TOKEN_TABLE[p](self, i)
else:
self.group.append(p)
# check against unmatched open brackets
if len(self.stack) > 0:
raise self.exc_type(self.parts, self.stack)
return self.group
def on_open(self, _):
# push current group to stack and open a new group
self.stack.append(self.group)
self.group = Group()
def on_close(self, i):
# check against unmatched closing brackets
if not len(self.stack) > 0:
# recurse over the remaining part to show all errors at once
# use two separate exception types to calculate highlighting only once for the caller-facing exception
err_stack = [self.group]
try:
GroupMaker(self.parts[i+1:], exc_type=_RawBracketError).compile()
except _RawBracketError as e:
err_stack.extend(e.err_stack)
raise self.exc_type(self.parts, err_stack)
# merge current group into nearest group on stack and pop
self.stack[-1].append(self.group)
self.group = self.stack.pop()
TOKEN_TABLE = {
Token(OPEN): on_open,
Token(CLOSE): on_close,
}
class OpPart (list):
def __repr__(self):
return "p" + super().__repr__()
class ResolvedPart (list):
def __repr__(self):
return "r" + super().__repr__()
def _tokenize_part(p):
return partition_by_tokens(p.replace(" ", ""), OPERATORS)
def tokenize(root_group):
splitted = root_group.shallow_split(SEP)
logger.debug(f"Shallow splitted:\n{' '*4}{splitted}")
meta_parts = []
for sep_portion in splitted:
part = []
for i, p in enumerate(sep_portion):
if isinstance(p, str):
if EVAL in p:
a, b = p.split(EVAL, maxsplit=1)
sep_remainder = sep_portion[i+1:]
if not b.replace(" ", "") and sep_remainder and isinstance(sep_remainder[0], Group):
part.extend( [*_tokenize_part(a), Token(EVAL)] )
else:
eval_str = b + "".join(str(x) for x in sep_remainder)
part.extend( [*_tokenize_part(a), Token(EVAL), eval_str] )
break
else:
part.extend(_tokenize_part(p))
else: # isinstance(p, Group)
part.append(p)
meta_parts.append(part[0] if len(part) == 1 else OpPart(part))
# tolerate trailing or multiple commas
return [p for p in meta_parts if p != []]
def _apply_excludes(base, excludes):
not_found = []
for v in excludes:
# note, in case of multiple occurrences, this removes the leftmost item
base.remove(v) if v in base else not_found.append(v)
if len(not_found) > 0:
raise ValueError(f"Unfound excludes: {base} / {not_found}")
def _wrap_to_list(item):
if not isinstance(item, list):
return [item]
return item
OpInfo = namedtuple("OpInfo", ("prio", "func", "lraw", "rraw"), defaults=(False, False))
class OperatorHandler:
def __init__(self, ref_translator, variables, recursive_pnp):
self.ref_translator = ref_translator
self.variables = variables
self.recursive_pnp = recursive_pnp
self.ctx = None # initial
def __call__(self, token_data):
out = []
for part in token_data:
parsed = self.PART_TABLE[type(part)](self, part)
(out.extend if isinstance(parsed, list) else out.append)(parsed)
return out
def parse_op_part(self, part):
prios = [self.OP_TABLE[x] if isinstance(x, Token) else None for x in part]
return self._apply_operators(part, prios)
def _apply_operators(self, part, prios):
if len(part) < 2: # primitive
value, = part # unpack
return self.PRIMITIVES_TABLE[type(value)](self, value)
else:
i, info = max(enumerate(prios), key=lambda x: 0 if x[1] is None else x[1].prio)
left = self._apply_or_void(part[:i], prios[:i], info.lraw)
right = self._apply_or_void(part[i+1:], prios[i+1:], info.rraw)
# assert None not in (left, right), "Operator method must return non-None value"
return info.func(self, left, right)
def _apply_or_void(self, part, prios, raw):
if not part:
return VOID
if raw:
return part
return self._apply_operators(part, prios)
def parse_str(self, value):
if value.isdigit():
return int(value)
else:
var = self.variables[self.ctx][value.lower()]
if isinstance(var, (range, tuple)):
var = list(var)
return var
def parse_group(self, group):
token_data = tokenize(group)
logger.debug(f"Tokenized Operators:\n{' '*4}{token_data}")
return self(token_data)
@contextmanager
def relative_ctx(self, tmp_ctx):
orig_ctx, self.ctx = self.ctx, tmp_ctx
try:
yield
finally:
self.ctx = orig_ctx
def on_relative(self, raw_left, raw_right):
assert self.ctx is None, "Nested relativity is not meaningful"
leading_ops, label = raw_left[:-1], raw_left[-1]
with self.relative_ctx(label):
value = _wrap_to_list( self.parse_op_part(raw_right) )
output = [self.ref_translator[label][n] for n in value]
if leading_ops:
return self.parse_op_part( [*leading_ops, ResolvedPart(output)] )
else:
return output
def on_exclude(self, base, excludes):
base = base.copy() # make a copy to be on the safe side
excludes = _wrap_to_list(excludes)
_apply_excludes(base, excludes) # modifies base in place
return base
def on_reverse(self, _, value):
assert _ is VOID
return value[::-1] # or list(reversed(value))
def on_multiplex(self, value, count):
if isinstance(value, list):
return value * count
else:
return [value] * count
def on_step(self, value, step):
return value[::step]
def on_dash(self, left, right):
if left is VOID:
stop = self.variables[self.ctx][END] + 1
if isinstance(right, list):
return [stop - n for n in right]
else:
return stop - right
else:
if left < right:
return list( range(left, right+1) )
else:
return list( range(left, right-1, -1) )
def on_eval(self, _, raw_part):
assert _ is VOID
value, = raw_part # unpack
value = str(value) # may be a Group
namespace = {**self.variables[self.ctx], "V": self.variables, "R": self.ref_translator, "pnp": self.recursive_pnp}
out = eval(value, namespace)
if isinstance(out, (tuple, range)):
out = list(out)
return out
PRIMITIVES_TABLE = {
str: parse_str,
Group: parse_group,
ResolvedPart: lambda h, v: v, # passthrough
}
PART_TABLE = PRIMITIVES_TABLE.copy()
PART_TABLE[OpPart] = parse_op_part
OP_TABLE = {
Token(RELATIVE): OpInfo(7, on_relative, lraw=True, rraw=True),
Token(EXCLUDE): OpInfo(6, on_exclude),
Token(REVERSE): OpInfo(5, on_reverse),
Token(STEP): OpInfo(4, on_step),
Token(MULTIPLEX): OpInfo(3, on_multiplex),
Token(DASH): OpInfo(2, on_dash),
Token(EVAL): OpInfo(1, on_eval, rraw=True)
}
def validate(page_indices, variables):
pass # TODO
def get_variables(base_length, ref_translator):
variables = {}
stop = base_length + 1
variables[None] = {BEGIN: 1, END: base_length, ODD: range(1, stop, 2), EVEN: range(2, stop, 2), ALL: range(1, stop)}
# NOTE this requires an ordered mapping
for label, mapping in ref_translator.items():
odd, even = [], []
for n in mapping.keys():
(odd if n % 2 else even).append(n)
variables[label] = {
BEGIN: min(mapping.keys()), END: max(mapping.keys()),
ODD: tuple(odd), EVEN: tuple(even), ALL: tuple(mapping.keys()),
}
return variables
def parse_pagenums(string, ref_translator=None, variables=None):
logger.debug(f"Input:\n{' '*4}{string!r}")
bracket_parts = partition_by_tokens(string, BRACKETS)
logger.debug(f"Tokenized Brackets:\n{' '*4}{bracket_parts}")
root_group = GroupMaker(bracket_parts).compile()
logger.debug(f"Grouped by Brackets:\n{' '*4}{root_group!r}")
recursive_pnp = lambda s: parse_pagenums(s, ref_translator, variables)
page_indices = OperatorHandler(ref_translator, variables, recursive_pnp)([root_group])
validate(page_indices, variables)
return page_indices
# testing
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
import time
t_doclen = 20
t_ref_translator = {"A": {2:8, 3:9, 4:10, 5:11, 6:12, 7:13, 10:16}, "1": {1:3}}
t_variables = get_variables(t_doclen, t_ref_translator)
logger.debug("Variables:\n" + '\n'.join(f"{k}: {v}" for k, v in t_variables.items()))
def testcase(string):
out = parse_pagenums(string, t_ref_translator, t_variables)
print(out, end="\n\n")
return out
def run_samples():
# testcase("1,2,(3,4)),(5,(6")
# testcase("A=B=1") # or "A=(B=1)"
starttime = time.time()
# NOTE the test samples aren't particularly meaningful or creative at this time, they're just to traverse the code and test parser functionality
testcase("1, 2, (3, (4, 5)), 6#2, 6-10/8, (~A=2-3)/9, 8-10/A=2, ~3-5")
testcase("-1, !-1, !8-5, !(3-1)-6, A=!(begin, end), 1-8/![x for x in range(2, 5)]")
testcase("end, 1, -end, -1, -odd")
testcase("end-1/(1-2), odd/1, even/2, all/(3,5-7)")
testcase("A=(begin, end, even, odd, all)")
testcase("A=(all/(2, 4)), all/A=(2,4), A=all/(2,4), 1=1")
testcase("1-7@2, 7-1, , 1-5#2@2, ")
testcase("!pnp('begin, end'), ![v for p in zip(pnp('5-10'), pnp('10-5')) for v in p]")
duration = time.time() - starttime
print(f"Duration {duration}s")
if __name__ == "__main__":
run_samples()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment