Skip to content

Instantly share code, notes, and snippets.

@luis261
Last active April 28, 2024 10:34
Show Gist options
  • Save luis261/9c663b3ce7e3ec0436cb343ecfd8e394 to your computer and use it in GitHub Desktop.
Save luis261/9c663b3ce7e3ec0436cb343ecfd8e394 to your computer and use it in GitHub Desktop.
metaprogrammable dicts in Python: dynamically define custom, (optionally) recursive dictionary types. Load in/generate only the wanted additional validation/general functionality in a declarative manner; allows fine-tuning via inversion of control
from .common import VHandlingError as ValHandlingError
from .boundary import VHandlingIn as ValHandlingInterface
from .core import EnhancedDictFactory
from .proto_factory import ProtoFactory
__all__ = [
"ValHandlingInterface",
EnhancedDictFactory.__name__
]
from functools import wraps
""" decorators """
def _autofmt(to_deco):
@wraps(to_deco)
def fmt_wrapper(*args, msg=None, **kwargs):
assert not msg is None
return to_deco(*args, **{"msg": msg.format(*_arg_fmt(args)), **kwargs})
return fmt_wrapper
def _parameterized_1st(to_wrap):
def outer(client_code_cfg_param):
@wraps(to_wrap)
def inner(*args, **kwargs):
return to_wrap(client_code_cfg_param, *args, **kwargs)
return inner
return outer
def _add_handling_pass_cfg(*exctypes, err_hdl=None):
def _add_handling_inner(to_wrap):
@wraps(to_wrap)
def wrapper(*args, **kwargs):
try:
return to_wrap(*args, **kwargs)
except tuple(exctypes):
return None if err_hdl is None else err_hdl(*args, **kwargs)
return wrapper
return _add_handling_inner
""" functions """
def _arg_fmt(args):
return [repr(str(a)) for a in args]
def _merge_dicts(a, b):
res = type(a)(**a, **b)
if len(res) < len(a) + len(b):
raise KeyError("Detected conflicting keys!")
else:
return res
def _is_ty_else_raise(t):
def _no_conv_tcheck(args):
if not isinstance(args[0], t):
raise TypeError("Expected object of type: " + repr(t)
+ ", got: " + repr(type(args[0])))
return args
return _no_conv_tcheck
from enum import Enum
from .common import _VHDirectives
class VHandlingIn(Enum):
"""Accepts code/cfg to tie into lib-internal constructs in a controlled way."""
PASS = _VHDirectives._pass
WARN = _VHDirectives._warn
CONVERT = _VHDirectives._convert
ABORT = _VHDirectives._abort
RAISE = _VHDirectives._raise
from functools import wraps
import warnings
from ._utils import _arg_fmt, _autofmt, _merge_dicts, _parameterized_1st
class AccumError(Exception):
"""Exception to be raised on unsound or failed accumulations."""
pass
class VHandlingError(Exception):
"""This gets raised on events as prescribed by the client code."""
pass
class VHandlingAbort(Exception):
"""Exception for propagating aborts as prescribed by the client."""
pass
""" accumulator decorators """
def merge_if_dict(acc):
@wraps(acc)
def wrapper(existing, new):
if isinstance(existing, dict):
return _merge_dicts(existing, new)
return acc(existing, new)
return wrapper
def to_accerr(etype):
def accerr_deco(to_wrap):
@wraps(to_wrap)
def handle_to_accerr(existing, new):
try:
return to_wrap(existing, new)
except etype as e:
raise AccumError from e
return handle_to_accerr
return accerr_deco
def no_accum(existing, new):
raise AccumError
nested_accum_only = to_accerr(KeyError)(merge_if_dict(no_accum))
def conv_1st_to(t):
def dict_else_convert(args):
if isinstance(args[0], dict):
raise TypeError
args[0] = t(args[0])
return args
return dict_else_convert
def dedup_naive(x): # no preservation of order, does not work for unhashable types
if isinstance(x, dict):
return x
return type(x)(set(x))
class _VHDirectives():
@staticmethod
def _pass(*args, lmbd=None, **kwargs):
return args[0] if lmbd is None else lmbd(*args)
@_autofmt
@staticmethod
def _warn(*args, msg=None, **kwargs):
warnings.warn("Detected " + msg, RuntimeWarning)
return _VHDirectives._pass(*args, **kwargs)
@_parameterized_1st
@staticmethod
def _convert(conversion_func, *args, msg=None, **kwargs):
assert not conversion_func is None
assert not msg is None
msg = msg.format(*_arg_fmt(args))
return _VHDirectives._pass(*conversion_func(args), **kwargs)
@_autofmt
@staticmethod
def _abort(*args, msg=None, **kwargs):
warnings.warn("Prevented " + msg, RuntimeWarning)
raise VHandlingAbort()
@_autofmt
@staticmethod
def _raise(*args, msg=None, **kwargs):
raise VHandlingError("Failed on " + msg)
from copy import copy
from functools import partial, wraps
from ._utils import _add_handling_pass_cfg as _add_handling
from .common import AccumError, VHandlingAbort
def _autofill_args_wrap_seti(to_wrap):
@wraps(to_wrap)
def capture_factory_self(factory, *args, **kwargs):
if (factory._CFG_COMPOSITION_ORDER.index(to_wrap.__name__) <=
factory._successive_composition_idx):
raise TypeError(
"Can't call \"" + to_wrap.__name__
+ "\" now, please respect the enforced configuration order by calling it earlier!"
)
else:
factory._successive_composition_idx = factory._CFG_COMPOSITION_ORDER.index(to_wrap.__name__)
if to_wrap.__name__ == "nested":
factory._name_cfg = "Nested" + factory._name_cfg
elif not factory._CFG_METH_TO_NAME_SUFFIX.get(to_wrap.__name__) is None:
factory._name_cfg += factory._CFG_METH_TO_NAME_SUFFIX[to_wrap.__name__]
else:
pass
if to_wrap.__name__ == "on_overrides":
factory._override_dir_given = True
def seti_wrapper(
itself, key, to_insert,
inner=factory._nspdict_cfg["__setitem__"], **_internal_prio
):
return to_wrap(inner, itself, key, to_insert, *args, **{**kwargs, **_internal_prio})
factory._nspdict_cfg["__setitem__"] = seti_wrapper
return factory # enables meth chaining for all decorated methods
return capture_factory_self
class EnhancedDictFactory():
"""
Core factory to dynamically generate code for custom `dict` types.
All arguments for methods decorated with `_autofill_args_wrap_seti`
starting with an underscore shall be omitted by calls from client code.
"""
_CFG_COMPOSITION_ORDER = [
"on_overrides",
"with_keys_of",
"nested",
"on_inputs"
]
_CFG_METH_TO_NAME_SUFFIX = {
"on_overrides": "WithCustomOverrides",
"with_keys_of": "WithTypeCheckedKeys",
"on_inputs": "WithCustomInputHandling"
}
def __init__(self):
self.bases_cfg = (dict,)
self._name_cfg = "EnhancedCustomDict"
self._successive_composition_idx = -1
self._override_dir_given = False
def _bypass_accum(itself, key, to_insert, _handler=None):
# no need for cooperative multiple inheritance here, would
# rather directly depend on the exact implementation we want
# instead of relying on `super` stackframe access compiler magic
dict.__setitem__(itself, key,
(_handler or itself)._handle_duplicates(to_insert, key))
return to_insert
def _seti_basecase(itself, key, to_insert, _handler=None):
"""
Perform the innermost insert without bypassing accumulation.
The `_handler` kwarg is useful when an item should be inserted
on a "dumb" type which can't handle accumulation/duplicates;
in that case you can pass in an instance which has the additional
capabilites as `_handler`, while passing the actual instance to
insert on as usual via `itself`.
"""
try:
itself[key]
except KeyError: # nonexistent key
pass
else:
to_insert = (_handler or itself)._accum(itself[key], to_insert)
return _bypass_accum(itself, key, to_insert, _handler=_handler)
self._nspdict_cfg = {
"__setitem__": _seti_basecase,
"_dupl_dir": None,
"_bypass_accum": _bypass_accum,
"_handle_duplicates": self.__class__._handle_duplicates
}
@staticmethod
def from_proto(proto, cfg_stash=None):
"""Extract a factory managed in a `ProtoFactory` by deploying it."""
if not cfg_stash is None:
for cfg_meth_name in cfg_stash: # optional, adhoc application
# (preferably handled upfront, directly via the proto instance instead)
getattr(proto, cfg_meth_name)(cfg_stash[cfg_meth_name])
return proto.deploy()
def build(self, accum=None):
"""
Dynamically create a class inheriting from `dict`.
The returned `dict` class is augmented according to any
config applied previously via the other factory methods.
If you directly supply an accumulator (`accum` kwarg),
it overrides any previously configured accumulator,
but only for this particular build call.
If no accumulator is passed nor was previously configured,
a hardcoded fallback is used.
"""
# can't operate inplace to avoid sideffects but don't need/can't use deepcopy
out_nsp = copy(self._nspdict_cfg)
if not self._override_dir_given:
type(self)._wrap_seti(
out_nsp,
_add_handling(AccumError, # general handling of accum failure signals
err_hdl=self._nspdict_cfg["_bypass_accum"] # fallback on bypass
)
)
# aborts should stop the current call
# but we don't want them popping up in the client code
type(self)._wrap_seti(out_nsp, _add_handling(VHandlingAbort))
out_nsp["_accum"] = staticmethod(
(accum or self._nspdict_cfg.get("_accum", None)
or (lambda existing,new:existing+new))
)
return type(self._name_cfg, self.bases_cfg, out_nsp)
def __call__(self, *args, **kwargs):
"""Shortcut for more explicit call via `build`."""
return self.build(*args, **kwargs)
def with_custom_acc(self, accum):
"""Configure a custom accumulator."""
self._nspdict_cfg["_accum"] = accum
return self
def on_duplicates(
self, handling_directive, dedup_criterion,
is_dupl=lambda x:len(set(x))<len(x)
):
"""Specify code to run on duplicates, as specified by the criteria."""
self._name_cfg += "WithCustomHandlingOnDupl"
self._nspdict_cfg["_dupl_dir"] = staticmethod(handling_directive)
self._nspdict_cfg["_dedup_crit"] = staticmethod(dedup_criterion)
self._nspdict_cfg["_is_dupl"] = staticmethod(is_dupl)
return self
@_autofill_args_wrap_seti
def on_overrides(
_base_fn, _itself, _key, _to_insert,
handling_directive, _handler=None
):
"""Configure a handling directive that gets invoked on overrides."""
try:
return _base_fn(_itself, _key, _to_insert, _handler=_handler)
except AccumError:
return handling_directive(
_to_insert, _itself[_key],
msg="override of {1} with {0} under: " + repr(_key),
lmbd=lambda i, _ : partial(type(_handler or _itself)._bypass_accum, _itself, _key)(i, _handler=_handler)
)
@_autofill_args_wrap_seti
def with_keys_of(
_base_fn, _itself, _key, _to_insert,
*args, _handler=None
):
"""Make the resulting class only accept keys of a certain type."""
if not isinstance(_key, tuple(args)):
raise KeyError(
"Expected any of types: " + repr(tuple(args))
+ ", got key: " + repr(_key)
+ " of type " + repr(type(_key)) + " instead"
)
return _base_fn(_itself, _key, _to_insert, _handler=_handler)
@_autofill_args_wrap_seti
def nested(_base_fn, _itself, _key, _to_insert, custom_subtype=None):
"""Generate code to enable nested access via tuple notation."""
if isinstance(_key, tuple):
if len(_key) > 1:
ty = custom_subtype or type(_itself)
sub = _itself
for subcat in _key[:-1]:
try:
sub = _base_fn(sub, subcat, ty(), _handler=_itself)
except AccumError: # if no override directive configured
sub = type(_itself)._bypass_accum(
sub, subcat, ty(), _handler=_itself
)
return _base_fn(sub, _key[-1], _to_insert, _handler=_itself)
elif len(_key) == 1:
_key = _key[0]
return _base_fn(_itself, _key, _to_insert)
@_autofill_args_wrap_seti
def on_inputs(_base_fn, _itself, _key, _to_insert, handling_directive):
"""Configure code wrapped in handling interface to be run on all inputs."""
return _base_fn(
_itself, _key,
handling_directive(_to_insert, msg="input {0} for: " + repr(_key))
)
def _handle_duplicates(itself, val, key):
if (not itself._dupl_dir is None and type(itself)._dedup_crit(val)
and type(itself)._is_dupl(val)):
return itself._dupl_dir(val,
msg="duplicate values: {0} for: " + repr(key))
return val
@staticmethod
def _wrap_seti(nsp_dict, deco):
nsp_dict["__setitem__"] = deco(nsp_dict["__setitem__"])
return nsp_dict
# Not perfect, should handle most cases
# while outperforming alternatives;
# be sure to inherit appropriately in your custom types.
from collections.abc import Iterable
from functools import reduce, wraps
from ._utils import _is_ty_else_raise
from .boundary import VHandlingIn
from .core import EnhancedDictFactory
from .common import (
AccumError,
conv_1st_to as conv_to,
merge_if_dict,
nested_accum_only,
no_accum,
to_accerr
)
""" private, local utils """
def _pass_instance_to_mod_else_new(to_wrap):
@wraps(to_wrap)
def wrapper(instance=None):
return to_wrap(instance or EnhancedDictFactory())
return wrapper
""" defaults """
def sane_dedup_criterion(x):
return (isinstance(x, Iterable)
and not isinstance(x,
(dict, set, str)))
def dedup_preserve_order(to_dedup): # n^2
return type(to_dedup)(
reduce(
lambda li, x: li if x in li else li.append(x) or li,
to_dedup, []
)
)
def sane_iter_handling(acc):
@wraps(acc)
def wrapper(existing, new):
if (not isinstance(existing, Iterable)
or isinstance(existing, str)):
raise AccumError
return acc(existing, new)
return wrapper
def gen_accum(inner=lambda existing,new:existing+new):
return to_accerr(KeyError)(
merge_if_dict(sane_iter_handling(
to_accerr(TypeError)(inner)
))
)
sane_accum = gen_accum()
conv_to_set = VHandlingIn.CONVERT(conv_to(set))
tcheck_no_conv = lambda t:VHandlingIn.CONVERT(_is_ty_else_raise(t))
# You can use the `with_*` functions below as starting points for your own types.
# When doing so, you'll likely run into type errs due to invalid call ordering.
# To circumvent that, instead of directly passing an `EnhancedDictFactory`,
# try passing in a `ProtoFactory` with build forwarding turned off as the
# `instance` keyword argument. This way, the with-fn can apply its defaults on
# the proto object which acts as a proxy. You can then add to (or even override)
# that base setup, the resulting config only gets applied once you call `deploy`
# on the proxy. You will then receive the actual, internal factory,
# configured as specified.
"""
`dict` class creation functions
utilize the instance kwarg for access to
the object which generated the output class
"""
@_pass_instance_to_mod_else_new
def with_min_defaults(factory_instance):
"""Work like a `builtins.dict`, but restrict keys to `str`."""
return factory_instance.with_custom_acc(no_accum).with_keys_of(str).build()
@_pass_instance_to_mod_else_new
def with_sane_dedup(factory_instance):
return factory_instance.with_custom_acc(no_accum).on_duplicates(
VHandlingIn.CONVERT(lambda args:(dedup_preserve_order(args[0]),)),
sane_dedup_criterion
).build()
@_pass_instance_to_mod_else_new
def with_sane_defaults(factory_instance):
with_sane_dedup(instance=factory_instance)
return factory_instance.with_custom_acc(
nested_accum_only
).on_overrides(
VHandlingIn.WARN
).with_keys_of(str).nested(
custom_subtype=dict
).build()
@_pass_instance_to_mod_else_new
def with_nice_defaults(factory_instance):
with_sane_dedup(instance=factory_instance)
factory_instance.with_custom_acc(sane_accum)
factory_instance.on_overrides(VHandlingIn.ABORT)
factory_instance.with_keys_of(str)
factory_instance.nested()
return factory_instance.build()
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
from .boundary import VHandlingIn
from .common import conv_1st_to, dedup_naive, no_accum, nested_accum_only
from .core import EnhancedDictFactory
_gen_base = lambda:EnhancedDictFactory().with_custom_acc(no_accum)
SetValuesDict = _gen_base().on_inputs(
VHandlingIn.CONVERT(
conv_1st_to(set))
).build()
BasicNestedDict = EnhancedDictFactory().nested(
custom_subtype=dict).build(accum=nested_accum_only)
FrozenAssignmentsDict = _gen_base().on_overrides(VHandlingIn.ABORT).build()
NaiveDedupValuesDict = _gen_base().on_duplicates(
VHandlingIn.CONVERT(
lambda *args:dedup_naive(args[0])
), lambda _:True, is_dupl=lambda:True
).build()
IntersectedSetValuesDict = EnhancedDictFactory().on_overrides(
VHandlingIn.RAISE
).on_inputs(
VHandlingIn.CONVERT(conv_1st_to(frozenset))
).build(accum=set.intersection)
from copy import deepcopy
import warnings
from .core import EnhancedDictFactory
class ProtoFactory():
"""
An alternative for passing raw factories.
Think of this class as a proxy to route method calls on a factory
in a time indirective manner which ensures they are ultimately
applied in the mandated order.
In cases where you want to configure a factory without adhering
to the call order enforced by raw factory objects, you can use
this as a stand-in replacement over a conventional factory.
This can be especially useful when passing the object through
a `with_*` function from the `defaults` module, which may be used
as a point to start of your own definition. By opting for an object
of this type, you avoid hard committing to the applied defaults
and retain the ability to perform overrides after the fact,
before ultimately deploying the type.
"""
def __init__(self, fwd_build=True, instance=None):
self._fwd_build = fwd_build
self._frozen = False
self._stash = {}
# composition over inheritance, specifically here to circumvent requiring __getattribute__
self._factory = instance or EnhancedDictFactory()
def __getattr__(self, attr):
r"""Attribute access interception "lite"."""
if self._frozen:
raise AttributeError("Already configured downstream factory, permanently detached delayed attribute forwarding for this instance!")
def cfg_applier(*args, **kwargs):
self._stash[attr] = {
"args": args,
"kwargs": kwargs
}
return self # sustain chaining feature
return cfg_applier
def clone(self):
"""
Acquire a copy of this factory wrapper.
This can help avoid redundant reiteration of config
when defining multiple types with definitions that all
share a common set of basic configuration.
You obviously could define your own function instead,
but having the ability to clone at various points
throughout the definition process is more flexible.
"""
return deepcopy(self)
def build(self, *args, **kwargs):
"""
Create the custom class, via the internal factory.
Only if `_fwd_build` is truthy, does the config actually get applied.
"""
if self._fwd_build:
return self.deploy().build()
def __call__(self, *args, **kwargs):
"""Call config application via less explicit alias."""
return self.deploy(*args, **kwargs)
def deploy(self):
"""Apply cached config calls to the managed factory and return it."""
if self._frozen:
warnings.warn("The config which was cached here has already been passed onwards, this instance is depleted! Perhaps you called this repeatedly by mistake?", RuntimeWarning)
return self._factory
self._frozen = True
ordered_cfg = [None for _ in range(len(self._factory._CFG_COMPOSITION_ORDER))]
for attr, cfg_args in self._stash.items():
try:
ordered_cfg[self._factory._CFG_COMPOSITION_ORDER.index(attr)] = cfg_args
except ValueError:
self._propagate(attr, cfg_args)
for idx, cfg in enumerate(ordered_cfg):
if cfg is None:
continue
self._propagate(self._factory._CFG_COMPOSITION_ORDER[idx], cfg)
del self._stash
return self._factory
def _propagate(self, attr, cfg):
return getattr(self._factory, attr)(
*cfg["args"], **cfg["kwargs"]
)
def __deepcopy__(self, _):
warnings.warn("Deepcopy support for this type is tentative", PendingDeprecationWarning)
if self._frozen:
warnings.warn("You are cloning an object which references an already configured factory,"
+ " which won't be copied to the clone", RuntimeWarning)
pf = self.__class__()
pf._factory = EnhancedDictFactory()
pf._fwd_build = self._fwd_build
pf._frozen = self._frozen
pf._stash = deepcopy(self._stash)
return pf
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment