Skip to content

Instantly share code, notes, and snippets.

@arizvisa
Last active December 26, 2022 21:43
Show Gist options
  • Save arizvisa/ea757b9d2b3926e73262948756ac7b2e to your computer and use it in GitHub Desktop.
Save arizvisa/ea757b9d2b3926e73262948756ac7b2e to your computer and use it in GitHub Desktop.
constraints with some fixes and some better scoring for performance testing
class multicase(object):
"""
A lot of magic is in this class which allows one to define multiple cases
for a single function.
"""
CO_OPTIMIZED = 0x00001
CO_NEWLOCALS = 0x00002
CO_VARARGS = 0x00004
CO_VARKEYWORDS = 0x00008
CO_NESTED = 0x00010
CO_VARGEN = 0x00020
CO_NOFREE = 0x00040
CO_COROUTINE = 0x00080
CO_ITERABLE = 0x00100
CO_GENERATOR_ALLOWED = 0x01000
CO_FUTURE_DIVISION = 0x02000
CO_FUTURE_ABSOLUTE_IMPORT = 0x04000
CO_FUTURE_WITH_STATEMENT = 0x08000
CO_FUTURE_PRINT_FUNCTION = 0x10000
CO_FUTURE_UNICODE_LITERALS = 0x20000
CO_FUTURE_BARRY_AS_BDFL = 0x40000
CO_FUTURE_GENERATOR_STOP = 0x80000
cache_name = '__multicase_cache__'
documentation_name = '__multicase_documentation__'
def __new__(cls, *other, **t_args):
'''Decorate a case of a function with the specified types.'''
# Output some useful information to help the user if there's nothing that satisfies their parameters.
def missing(packed_parameters, tree, table, documentation={}, ignored_parameter_count=0):
'''Output the candidates for the callable that the user is attempting to use.'''
args, kwds = packed_parameters
Fcallable, Fhelp, Fprototype = "`{:s}`".format, "`help({:s})`".format, "{:s}".format
# Basic configuration
arrow, indent = ' -> ', 4
# Some useful utilities for speaking english at our users.
Fconjunction_or = lambda items: ', '.join(items[:-1]) + ", or {:s}".format(*items[-1:]) if len(items) > 1 else items[0]
Fconjunction_and = lambda items: ', '.join(items[:-1]) + ", and {:s}".format(*items[-1:]) if len(items) > 1 else items[0]
sorted_functions = [F for F in cls.sorted_documentation(documentation)]
# Collect all parameter names, keywords, and documentation for describing what happened.
description_arguments = [item.__class__.__name__ for item in args[ignored_parameter_count:]]
description_keywords = ["{:s}={!s}".format(name, kwds[name].__class__.__name__) for name in kwds]
iterable = ((F, documentation[F]) for F in sorted_functions)
description_functions = {F : (prototype, F.__module__ if hasattr(F, '__module__') else None, pycompat.code.name(pycompat.function.code(F))) for F, (prototype, _, _) in iterable}
# Build the error message that is displayed as part of the exception.
available_names = sorted({'.'.join([module, name]) if module else name for _, (_, module, name) in description_functions.items()})
conjunctioned = Fconjunction_and([Fcallable(item) for item in available_names])
description = Fconjunction_or(["{:s}({:s})".format(name, ', '.join(itertools.chain(description_arguments, description_keywords))) for name in available_names])
message = u"{:s}: The given parameter type{:s} not match any of the available cases for the definition of {:s}.".format(description, ' does' if sum(map(len, [description_arguments, description_keywords])) == 1 else 's do', conjunctioned)
# Build the list of candidates that the user will need to choose from.
listing_message = "The functions that are available for {:s} are:".format(Fconjunction_and([Fhelp(item) for item in available_names]))
iterable = ((F, description_functions[F]) for F in sorted_functions)
prototypes = [(Fprototype('.'.join([module, item]) if module else item if module else item), documentation[F]) for F, (item, module, _) in iterable]
# Calculate some lengths and use them to format our output in some meaningful way.
maximum = max(len(prototype) for prototype, _ in prototypes) if prototypes else 0
components = [(prototype, "{:s}{:s}".format(arrow, lines[0]) if len(lines) else '') for prototype, (_, lines, _) in prototypes]
iterable = ("{: <{:d}s}{:s}".format(prototype, maximum, description) for prototype, description in components)
listing = ["{: <{:d}s}{:s}".format('', indent, item) for item in iterable]
# Now we have a message and a listing that we can just join together with a newline.
raise internal.exceptions.UnknownPrototypeError('\n'.join(itertools.chain([message, '', listing_message], listing)))
# These are just utility closures that can be attached to the entrypoint closure.
def fetch_score(cache, *arguments, **keywords):
'''Return a list of all the callables, their (transformed) parameters, and their score for the given `arguments` and `keywords`.'''
tree, table = cache
packed_parameters = arguments, keywords
candidates = cls.match(packed_parameters, tree, table)
scores = {F : cls.critique_and_transform(F, packed_parameters, tree, table) for F, _ in candidates}
iterable = cls.preordered(packed_parameters, candidates, tree, table)
ordered = ((F, scores[F]) for F, _ in iterable)
return [(F, score, args, kwargs) for F, (args, kwargs, score) in ordered]
def fetch_callable(cache, *arguments, **keywords):
'''Return the first callable that matches the constraints for the given `arguments` and `keywords`.'''
tree, table = cache
packed_parameters = arguments, keywords
candidates = cls.match(packed_parameters, tree, table)
iterable = cls.preordered(packed_parameters, candidates, tree, table)
packed = next(iterable)
F, _ = packed
return F
def fetch_prototype(documentation, callable):
'''Return the documentation prototype for the given `callable`.'''
prototype, _, _ = documentation[callable]
return prototype
# This is the entry-point closure that gets used to update the actual wrapper with a new candidate.
def result(wrapped):
'''Return a function that will call the given `wrapped` function if the parameters meet the required type constraints.'''
flattened_constraints = {argname : tuple(item for item in cls.flatten(types if isinstance(types, internal.types.unordered) else [types])) for argname, types in t_args.items()}
# First we need to extract the function from whatever type it is
# so that we can read any properties we need from it. We also extract
# its "constructor" so that we can re-create it after we've processed it.
try:
cons, func = cls.reconstructor(wrapped), cls.ex_function(wrapped)
if not callable(func):
raise internal.exceptions.InvalidTypeOrValueError
except internal.exceptions.InvalidTypeOrValueError:
logging.warning("{:s}(...): Refusing to create a case for a non-callable object ({!s}).".format('.'.join([__name__, cls.__name__]), wrapped))
return wrapped
# Next we need to extract all of the argument information from it. We also need to
# determine whether it's a special type of some sort so that we know that its first
# argument is irrelevant for our needs. With regards to this, we can't do anything
# for methods since we see them before they get attached. However, we can explicitly
# check if it's using the magic name "__new__" which uses an implicit parameter.
args, defaults, (star, starstar) = cls.ex_args(func)
s_args = 1 if isinstance(wrapped, (internal.types.classmethod, internal.types.method)) or func.__name__ in {'__new__'} else 0
# If the user decorated us whilst explicitly providing the previous
# function that this case is a part of, then make sure that we use it.
if len(other):
ok, prev = True, other[0]
# If we weren't given a function, then we need to be tricky and search
# through our parent frame's locals. Hopefully it's using the same name.
elif pycompat.function.name(func) in sys._getframe().f_back.f_locals:
ok, prev = True, sys._getframe().f_back.f_locals[pycompat.function.name(func)]
# Otherwise, we've hit first blood and this is the very first definition
# of the function. This requires us to do some construction later.
else:
ok = False
# So if we found an already-existing wrapper, then we need to steal its cache.
res = ok and prev and cls.ex_function(prev)
if ok and hasattr(res, cls.cache_name):
owner, cache, documentation = res, getattr(res, cls.cache_name), getattr(res, cls.documentation_name)
# Otherwise, we simply need to create a new cache entirely.
else:
owner, cache, documentation = func, ({}, {}), {}
res = cls.new_wrapper(func, cache, Fmissing=functools.partial(missing, documentation=documentation, ignored_parameter_count=s_args))
res.__module__ = getattr(wrapped, '__module__', getattr(func, '__module__', '__main__'))
# Update our new wrapper with our cache (tree and table) and our
# documentation state so that it not only works but it reads too.
setattr(res, cls.cache_name, cache)
setattr(res, cls.documentation_name, documentation)
# Attach a few hardcoded utilities to the closure that we return. We add
# some empty namespaces as the first argument if it's a classmethod or __new__.
setattr(res, 'score', functools.partial(fetch_score, cache, *s_args * [object]))
setattr(res, 'fetch', functools.partial(fetch_callable, cache, *s_args * [object]))
setattr(res, 'describe', functools.partial(fetch_prototype, documentation))
# Now we need to add the function we extracted to our tree of candidate functions.
constraints = {name : types for name, types in flattened_constraints.items()}
tree, table = cache
F = cls.add(func, constraints, tree, table)
assert(F is func)
# Verify that we constrained all of the available types. If any are left, then
# we need to complain about it since we just added the case to our tree.
if constraints:
co = pycompat.function.code(func)
description = '.'.join(getattr(func, attribute) for attribute in ['__module__', '__name__'] if hasattr(func, attribute))
location = "{:s}:{:d}".format(pycompat.code.filename(co), pycompat.code.linenumber(co))
error_constraints = {name : "{:s}={!s}".format(name, types.__name__ if isinstance(types, internal.types.type) or types in {internal.types.callable} else '|'.join(sorted(t_.__name__ for t_ in types)) if hasattr(types, '__iter__') else "{!r}".format(types)) for name, types in flattened_constraints.items()}
logging.warning(u"@{:s}(...) : Unable to constrain {:d} parameter{:s} ({:s}) for prototype \"{:s}({:s})\" at {:s}.".format('.'.join([__name__, cls.__name__]), len(constraints), '' if len(constraints) == 1 else 's', ', '.join(constraints), description, ', '.join(error_constraints.get(name, name) for name in args[s_args:]), location))
# Now we can use the information we collected about the function to
# generate a documentation entry. Once we do this, then we completely
# regenerate the documentation so that it's always up to date.
documentation[func] = cls.render_documentation(func, flattened_constraints, args[:s_args])
res.__doc__ = cls.document(documentation)
# ..and then we can restore the original wrapper in all of its former glory.
return cons(res)
# Validate the types of all of our arguments and raise an exception if it used
# an unsupported type.
for name, types in t_args.items():
if not isinstance(types, (internal.types.type, internal.types.tuple)) and types not in {internal.types.callable}:
error_keywords = ("{:s}={!s}".format(name, types.__name__ if isinstance(types, internal.types.type) or types in {internal.types.callable} else '|'.join(t_.__name__ for t_ in types) if hasattr(types, '__iter__') else "{!r}".format(types)) for name, types in t_args.items())
raise internal.exceptions.InvalidParameterError(u"@{:s}({:s}) : The value ({!s}) specified for parameter \"{:s}\" is not a supported type.".format('.'.join([__name__, cls.__name__]), ', '.join(error_keywords), types, string.escape(name, '"')))
continue
# Validate the types of our arguments that we were asked to decorate with, this
# way we can ensure that our previously decorated functions are actually of the
# correct type. We do this strictly to assist with debugging.
try:
[cls.ex_function(item) for item in other]
except Exception:
error_keywords = ("{:s}={!s}".format(name, types.__name__ if isinstance(types, internal.types.type) or types in {internal.types.callable} else '|'.join(item.__name__ for item in types) if hasattr(types, '__iter__') else "{!r}".format(types)) for name, types in t_args.items())
raise internal.exceptions.InvalidParameterError(u"@{:s}({:s}) : The specified callable{:s} {!r} {:s} not of a valid type.".format('.'.join([__name__, cls.__name__]), ', '.join(error_keywords), '' if len(other) == 1 else 's', other, 'is' if len(other) == 1 else 'are'))
# If we were given an unexpected number of arguments to decorate with, then
# raise an exception. This is strictly done to assist with debugging.
if len(other) > 1:
error_keywords = ("{:s}={!s}".format(name, types.__name__ if isinstance(types, internal.types.type) or types in {internal.types.callable} else '|'.join(item.__name__ for item in types) if hasattr(types, '__iter__') else "{!r}".format(types)) for name, type in t_args.items())
raise internal.exceptions.InvalidParameterError(u"@{:s}({:s}) : More than one callable ({:s}) was specified to add a case to. Refusing to add cases to more than one callable.".format('.'.join([__name__, cls.__name__]), ', '.join(error_keywords), ', '.join("\"{:s}\"".format(string.escape(pycompat.code.name(c) if isinstance(c, internal.types.code) else c.__name__, '"')) for c in other)))
return result
@classmethod
def document(cls, descriptions):
'''Generate the documentation for a multicased function using the given `descriptions`.'''
result = []
# Configure how we plan on joining each component of the documentation.
arrow, indent = ' -> ', 4
maximum = max(len(prototype) for F, (prototype, _, _) in descriptions.items()) if descriptions else 0
# Collect each prototype and lines that compose each description.
for F in cls.sorted_documentation(descriptions):
prototype, lines, _ = descriptions[F]
pointed = "{: <{:d}s}{:s}".format(prototype, maximum, arrow)
iterable = (item for item in lines)
result.append(''.join([pointed, next(iterable)]) if len(lines) else prototype)
result.extend("{: >{padding:d}s}".format(item, padding=indent + len(pointed) + len(item)) for item in iterable)
return '\n'.join(result)
@classmethod
def filter_candidates(cls, candidates, packed_parameters, tree, table):
'''Critique the arguments in `packed_parameters` against the branch that is given in `candidates`.'''
args, _ = packed_parameters
parameters = (arg for arg in args)
# First we need a closure that's driven simply by the parameter value. This gets
# priority and will just descend through the tree until nothing is left.
def critique_unnamed(parameter, branch):
results = []
for F, node in branch:
parameter_name, index, _ = node
discard_and_required, critique_and_transform, _ = table[F, index]
parameter_critique, parameter_transform, _ = critique_and_transform
# Now we take the parameter value, transform it, and then critique it.
try: value = parameter_transform(parameter) if parameter_transform else parameter
except Exception: continue
if parameter_critique(value) if parameter_critique else True:
results.append((F, node))
continue
return results
# This function is only needed for processing arguments, because if there aren't any
# then all the nodes at a 0-height in our tree need to be checked for termination.
assert(args)
# Iterate through all of our parameters until we're finished or out of parameters.
iterable = ((F, tree[index][F]) for F, index in candidates)
branch = [(F, (name, index, next)) for F, (name, index, next) in iterable if index >= 0]
for index, parameter in enumerate(parameters):
candidates = critique_unnamed(parameter, branch)
branch = [(F, tree[next][F]) for F, (_, _, next) in candidates if next >= 0 and (F, next) in table]
return [(F, index) for F, (_, index, next) in candidates if index == next or next >= 0]
@classmethod
def filter_args(cls, packed_parameters, tree, table):
'''Critique the arguments from `packed_parameters` using the provided `tree` and `table`.'''
args, _ = packed_parameters
# If there are some parameters, then start out by only considering functions
# which support the number of args we were given as candidates.
if args:
count = len(args)
results = {F for F in tree.get(count - 1, [])}
# Next, we go through all of the available functions to grab only the ones
# which can take varargs and are smaller than our arg count.
iterable = ((F, pycompat.function.code(F)) for F in tree.get(0, []))
unknowns = {F for F, c in iterable if pycompat.code.flags(c) & cls.CO_VARARGS and pycompat.code.argcount(c) <= count}
# Now we turn them into a branch and then we can process the arguments.
candidates = [(F, 0) for F in results | unknowns]
return cls.filter_candidates(candidates, packed_parameters, tree, table)
# If there are no parameters, then return everything. This really should only be done
# by multicase.match, but we're doing this here for the sake of the unit-tests.
branch = tree[0].items()
return [(F, index) for F, (_, index, next) in branch if index == next or index >= 0]
@classmethod
def filter_keywords(cls, candidates, packed_parameters, tree, table):
'''Critique the keywords from the `packed_parameters` against the branch given in `candidates`.'''
args, kwds = packed_parameters
def critique_names(kwds, branch):
results, keys = [], {name for name in kwds}
for F, node in branch:
parameter_name, index, _ = node
discard_and_required, critique_and_transform, wildargs = table[F, index]
# Extract our sets that are required for the callable to be
# considered a candidate and filter our keywords depending
# on whether we're processing named parameters or wild ones.
discard, required = discard_and_required
available = keys - discard if wildargs else keys & required
# If we still have any parameters available and their names
# don't matter (wild), then critique and what we just consumed.
if available and wildargs:
parameter_critique, parameter_transform, _ = critique_and_transform
parameters = (kwds[name] for name in available)
transformed = map(parameter_transform, parameters) if parameter_transform else parameters
critique = parameter_critique if parameter_critique else lambda parameter: True
results.append((F, node)) if all(critique(parameter) for parameter in transformed) else None
# If our parameter name is available, then we can critique it.
elif available and parameter_name in available:
parameter_critique, parameter_transform, _ = critique_and_transform
try: parameter = parameter_transform(kwds[parameter_name]) if parameter_transform else kwds[parameter_name]
except Exception: continue
critique = parameter_critique if parameter_critique else lambda parameter: True
results.append((F, node)) if parameter_critique(parameter) else None
# Otherwise this parameter doesn't exist which makes it not a candidate.
else:
continue
continue
return results
# Process as many keywords as we have left...
branch = [(F, tree[index][F]) for F, index in candidates if (F, index) in table]
for count in range(len(kwds)):
critiqued = critique_names(kwds, branch)
# If processing this keyword resulted in a loop (varargs), then
# promote it to a wildargs so we can check the rest of the kwargs.
candidates = [(F, -1 if index == next else index) for F, (_, index, next) in critiqued]
branch = [(F, tree[-1 if index == next else next][F]) for F, (_, index, next) in critiqued if (F, -1 if index == next else next) in table]
return candidates
@classmethod
def critique_and_transform(cls, F, packed_parameters, tree, table):
'''Critique and transform the `packed_parameters` for the function `F` returning the resolved parameters and a bias for the number of parameters we needed to transform.'''
args, kwds = packed_parameters
results, keywords = [], {name : value for name, value in kwds.items()}
counter = 0
# First process each of the arguments and add them to our results.
_, index, _ = tree[0][F]
for arg in args:
_, critique_and_transform, _ = table[F, index]
parameter_critique, parameter_transform, parameter_constraints = critique_and_transform
parameter = parameter_transform(arg) if parameter_transform else arg
assert(parameter_critique(parameter) if parameter_critique else True)
#counter = counter if arg is parameter else counter + 1
#counter = counter if arg == parameter else counter + 1
#counter = counter if id(arg) == id(parameter) and parameter_critique else counter + 1 if parameter_critique else counter + 2
if parameter_critique:
counter = counter + 2 if id(arg) != id(parameter) else counter if arg.__class__ in parameter_constraints else counter + 1
else:
counter = counter + 3
results.append(parameter)
_, _, index = tree[index][F]
# First check if we have any other parameters we need to process
# because if we don't, then we can just quit while we're ahead.
if not keywords and (F, index) not in table:
return results, keywords, counter
assert((F, index) in table)
# Now since we have keywords left to process, we need to ensure
# that we still have parameters to complete or we need to promote
# ourselves to -1 so that we can critique the keywords leftover.
_, index, next = tree[index][F]
index = -1 if index == next else index if (F, next) in table else index
# Now we can process the rest of our function arguments using whatever
# keywords that are available until our index becomes less than 0.
while index >= 0 and (F, index) in table:
name, index, next = tree[index][F]
_, critique_and_transform, wild = table[F, index]
parameter_critique, parameter_transform, parameter_constraints = critique_and_transform
if index >= 0 and name and not wild:
arg = keywords.pop(name)
parameter = parameter_transform(arg) if parameter_transform else arg
assert(parameter_critique(parameter) if parameter_critique else True)
#counter = counter if arg is parameter else counter + 1
#counter = counter if arg == parameter else counter + 1
#counter = counter if id(arg) == id(parameter) and parameter_critique else counter + 1 if parameter_critique else counter + 2
if parameter_critique:
counter = counter + 2 if id(arg) != id(parameter) else counter if arg.__class__ in parameter_constraints else counter + 1
else:
counter = counter + 3
results.append(parameter)
index = next
# Despite this assertion not being exhaustive, if we ended up with some
# keywords leftover then our function and index should be in the table.
assert((F, index) in table if keywords else True)
# That should be all of our named arguments, so whatever is left should
# be the keyword arguments that belong to the wildargs candidates.
return results, keywords, counter
@classmethod
def ordered(cls, candidates, tree, table):
'''Yield the given `candidates` in the correct order using `tree` and `table`.'''
iterable = reversed(sorted(candidates, key=operator.itemgetter(1)))
items = [ item for item in iterable ]
# First we yield all of the items that have successfully terminated.
iterable = ( item for item in items if item not in table )
for F, index in iterable:
yield F, index
# Now we can yield the callable that we resolved the most parameters with.
count, items = 0, [item for item in items if item in table]
for F, index in items[count:]:
yield F, index
count += 1
# Afterwards, we just yield the rest which should all be wildarg parameters.
for F, index in items[count:]:
yield F, index
return
@classmethod
def preordered(cls, packed_parameters, candidates, tree, table):
'''Yield the callable and transformed `packed_parameters` from `candidates` using `tree` and `table` in the correct order.'''
results = {}
[ results.setdefault(index, []).append(F) for F, index in candidates ]
order = [index for index in reversed(sorted(results))]
for index in order:
items = [ F for F in results[index] if (F, index) not in table ]
# If we have more than one match, then we need to pre-sort this
# by whatever their bias is so that we choose the right one.
if len(items) > 1:
biased, iterable = {}, (cls.critique_and_transform(F, packed_parameters, tree, table) for F in items)
#biased = {bias : (F, (args, kwds)) for F, (args, kwds, bias) in zip(items, iterable) }
[ biased.setdefault(bias, []).append((F, (args, kwds))) for F, (args, kwds, bias) in zip(items, iterable) ]
ordered = itertools.chain(*(biased[key] for key in sorted(biased)))
# Otherwise, we don't need to sort and can take the first one.
else:
iterable = (cls.critique_and_transform(F, packed_parameters, tree, table) for F in items)
ordered = ((F, (args, kwds)) for F, (args, kwds, _) in zip(items, iterable))
# Now we have the biased order and the parameters to use, so yield
# them to the caller so that it can actually be executed.
for F, packed in ordered:
yield F, packed
# Now we iterate through the results that actually do exist
# because they're either variable-length parameters or wild.
items = [ F for F in results[index] if (F, index) in table ]
# Similarly, if we have more than one match here, then we need
# to critique_and_transform the parameters and sort by bias.
if len(items) > 1:
biased, iterable = {}, (cls.critique_and_transform(F, packed_parameters, tree, table) for F in items)
#biased = {bias : (F, (args, kargs)) for F, (args, kargs, bias) in zip(items, iterable) }
[ biased.setdefault(bias, []).append((F, (args, kwds))) for F, (args, kwds, bias) in zip(items, iterable) ]
ordered = itertools.chain(*(biased[key] for key in sorted(biased)))
# There's only one match, so that's exactly what we'll return.
else:
iterable = (cls.critique_and_transform(F, packed_parameters, tree, table) for F in items)
ordered = ((F, (args, kwds)) for F, (args, kwds, _) in zip(items, iterable))
# Yield what we found and continue to the next match.
for F, packed in ordered:
yield F, packed
continue
return
# For the following methods, we could expose another decorator that allows
# one to specifically update the critique_and_transform field inside the
# parameter table, but for simpliciy (and backwards compatibility) we only
# use a single decorator and explicitly transform the type with these.
@classmethod
def parameter_critique(cls, *types):
'''Return a callable that critiques its parameter for any of the given `types`.'''
unsorted_types = {item for item in types}
# If there are no types, then we can bail here because critique should always succeed.
if not unsorted_types:
return None
# Filter our types for things that are not actual types. This is okay since we
# should be using unsorted_types to check whether to include our conditions and
# we need this so that we can use the types we were given with isinstance().
filtered = tuple(item for item in unsorted_types if isinstance(item, type))
if 1 < operator.sub(*reversed(sorted(map(len, [unsorted_types, filtered])))):
invalid = unsorted_types - {item for item in itertools.chain(filtered, [callable])}
parameters = [item.__name__ if isinstance(item, type) else item.__name__ if item in {callable} else "{!r}".format(item) for item in types]
raise internal.exceptions.InvalidParameterError(u"{:s}.parameter_critique({:s}) : Refusing to critique a parameter using {:s} other than a type ({:s}).".format('.'.join([__name__, cls.__name__]), ', '.join(parameters), 'an object' if len(invalid) == 1 else 'objects', ', '.join(map("{!r}".format, invalid))))
types = filtered
# Add our default condition that ensures that the parameter is a concrete type.
Finstance = lambda item, types=types: isinstance(item, types)
conditions = [Finstance]
# Add other conditions in case the type can be transformed to the correct one.
if {item for item in internal.types.integer} & unsorted_types:
conditions.append(lambda item: hasattr(item, '__int__'))
# XXX: there's literally no reason to detect parameters that can be coerced to a string.
#if {item for item in internal.types.string} & unsorted_types:
# conditions.append(lambda item: hasattr(item, '__str__'))
if callable in unsorted_types:
conditions.append(callable)
# Now we need to determine whether we combine our tests or only need the first one.
Fany = lambda item, conditions=conditions: any(F(item) for F in conditions)
return Fany if len(conditions) > 1 else Finstance
@classmethod
def parameter_transform(cls, *types):
'''Return a callable that transforms its parameter to any of the given `types`.'''
unsorted_types = {item for item in types}
# If there are no types, then we can bail here because there's no transform required.
if not unsorted_types:
return None
# Filter our types so that we can use them with isinstance() as we'll be
# using the unsorted_types set to check for type membership.
filtered = tuple(item for item in unsorted_types if isinstance(item, type))
if 1 < operator.sub(*reversed(sorted(map(len, [unsorted_types, filtered])))):
invalid = unsorted_types - {item for item in itertools.chain(filtered, [callable])}
parameters = [item.__name__ if isinstance(item, type) else item.__name__ if item in {callable} else "{!r}".format(item) for item in types]
raise internal.exceptions.InvalidParameterError(u"{:s}.parameter_transform({:s}) : Refusing to transform a parameter using {:s} other than a type ({:s}).".format('.'.join([__name__, cls.__name__]), ', '.join(parameters), 'an object' if len(invalid) == 1 else 'objects', ', '.join(map("{!r}".format, invalid))))
types = filtered
# Create a list that includes the condition for a transformation and the
# transformation itself. If it's not one of these, then we leave it alone.
transformers = []
# Figure out the conditions that require us to avoid transforming the item.
Fidentity = lambda item: item
if callable in unsorted_types and len(unsorted_types) == 1:
transformers.append((Fidentity, callable))
# If there's no callables in our unsorted_types, then we can just use isinstance.
elif callable not in unsorted_types:
transformers.append((Fidentity, lambda item, types=types: isinstance(item, types)))
# Otherwise there's some types and a callable that we need to split into two transformations.
else:
transformers.append((Fidentity, lambda item, types=types: isinstance(item, types)))
transformers.append((Fidentity, callable))
# Figure out whether we need to add additional transformations...just in case.
if {item for item in internal.types.integer} & unsorted_types:
transformers.append((int, lambda item: hasattr(item, '__int__')))
# XXX: i'm pretty much just showing off here, as there's literally no reason to attempt string coercion.
#if {item for item in internal.types.string} & unsorted_types:
# transformers.append((unicode if str == bytes else str, lambda item: hasattr(item, '__str__')))
# Now we figure out if we need to do any actual transformations by checking whether the
# number of transformers we collected is larger than 1. This is because the first transformer
# will always be the identity function when the item type is correct.
Ftransform = lambda item, transformers=transformers: next((F(item) for F, condition in transformers if condition(item)), item)
return Ftransform if len(transformers) > 1 else Fidentity
# XXX: Our main decorator that is responsible for updating the decorated function.
@classmethod
def add(cls, callable, constraints, tree, table):
'''Add the `callable` with the specified type `constraints` to both the `tree` and `table`.'''
args, kwargs, packed = cls.ex_args(callable)
varargs, wildargs = packed
Fflattened_constraints = lambda types: {item for item in cls.flatten(types if isinstance(types, internal.types.unordered) else [types])}
# Extract the parameter names and the types that the callable was
# decorated with so that we can generate the functions used to
# transform and critique the value that determines its validity.
critique_and_transform = []
for name in args:
t = constraints.pop(name, ())
Fcritique, Ftransform = cls.parameter_critique(*t), cls.parameter_transform(*t)
critique_and_transform.append((Fcritique, Ftransform, Fflattened_constraints(t)))
# Generate two sets that are used to determine what parameter names
# are required for this wrapped function to still be considered.
discard_and_required = []
for index, name in enumerate(args):
discard = {item for item in args[:index]}
required = {item for item in args[index:]}
discard_and_required.append((discard, required))
# Zip up our parameters with both our critique_and_transform and
# discard_and_required lists so we can build a tree for each parameter.
items = [packed for packed in zip(enumerate(args), discard_and_required, critique_and_transform)]
for index_name, discard_and_required, critique_and_transform in items:
index, name = index_name
assert(index_name not in table)
table[callable, index] = discard_and_required, critique_and_transform, wildargs if wildargs == name else ''
tree.setdefault(index, {})[callable] = name, index, 1 + index
# We should be done, but in case there's var args or wild args (keywords), then
# we'll need to create some cycles within our tree and table. None of these entries
# hold anything of value, but they need to hold something.. So we create some defaults.
discard_and_required = {name for name in args}, {name for name in []}
critique_and_explode = operator.truth, lambda item: False, ()
critique_and_continue = operator.truth, lambda item: True, ()
# If there are no parameters whatsoever, then we need a special case
# which gets used at the first pass of our parameter checks. Essentially
# we treat this as a vararg, but without the loop. This way if it does
# turn out to be a vararg or wildarg, it will get fixed to add the loop.
if not args:
discard_and_impossible = {name for name in args}, {None}
# If we don't have any parameters, then our first parameter should
# immediately fail. If we're variable-length'd or wild, then the
# conditionals that follow this will overwrite this with a loop.
table[callable, len(args)] = discard_and_impossible, critique_and_explode, ''
tree.setdefault(len(args), {})[callable] = '', 0, -1
# If both are selected, then we need to do some connections here.
if varargs and wildargs:
t = constraints.pop(wildargs, ())
Fcritique, Ftransform = cls.parameter_critique(*t), cls.parameter_transform(*t)
critique_and_transform = Fcritique, Ftransform, Fflattened_constraints(t)
# Since this callable is variable-length'd, we create a loop in our tree
# and table so that we can consume any number of parameters.
table[callable, len(args)] = discard_and_required, critique_and_transform, wildargs
tree.setdefault(len(args), {})[callable] = varargs, len(args), len(args)
# Since the callable is also wild, we need to create a loop outside the
# count of parameters (-1). This way we can promote a loop to this path.
tree.setdefault(-1, {})[callable] = wildargs, -1, -1
table[callable, -1] = discard_and_required, critique_and_transform, wildargs
# If there's variable-length parameters, then we simply need to create a loop.
elif varargs:
t = constraints.pop(varargs, ())
Fcritique, Ftransform = cls.parameter_critique(*t), cls.parameter_transform(*t)
critique_and_transform = Fcritique, Ftransform, Fflattened_constraints(t)
# We can't really match against the parameter name with variable-length
# parameters, so we add it as a loop for an empty string (unnamed).
tree.setdefault(len(args), {})[callable] = varargs, len(args), len(args)
table[callable, len(args)] = discard_and_required, critique_and_transform, ''
# Pop out the wild (keyword) parameter type from our decorator parameters.
elif wildargs:
t = constraints.pop(wildargs, ())
Fcritique, Ftransform = cls.parameter_critique(*t), cls.parameter_transform(*t)
critique_and_transform = Fcritique, Ftransform, Fflattened_constraints(t)
# We need to go through our type parameters and update our table
# so that it includes any wild keyword parameters.
tree.setdefault(-1, {})[callable] = wildargs, -1, -1
table[callable, -1] = discard_and_required, critique_and_transform, wildargs
# Create our sentinel entry in the tree so that when we run out
# of args, we transfer to the keyword parameters (-1) to continue.
tree.setdefault(len(args), {})[callable] = '', len(args), -1
table[callable, len(args)] = discard_and_required, critique_and_continue, wildargs
return callable
@classmethod
def flatten(cls, iterable):
'''Take the provided `iterable` (or tree) and then yield each individual element resulting in it being "flattened".'''
duplicates = {item for item in []}
for item in iterable:
if isinstance(item, internal.types.unordered):
for item in cls.flatten(item):
if item in duplicates:
continue
yield item
duplicates |= {item}
continue
if item in duplicates:
continue
yield item
duplicates |= {item}
return
@classmethod
def render_documentation(cls, function, constraints, ignored):
'''Render the documentation for a `function` using the given `constraints` while skipping over any `ignored` parameters.'''
args, defaults, (star, starstar) = cls.ex_args(function)
parameters = [name for name in itertools.chain(args, [star, starstar]) if name]
prototype = cls.prototype(function, constraints, ignored)
documentation = function.__doc__ or ''
lines = documentation.split('\n')
constraint_order = [constraints.get(arg, ()) for arg in parameters]
return prototype, [item.strip() for item in lines] if documentation.strip() else [], (lambda *args: args)(*constraint_order)
# Create a dictionary to bias the order of our documentation so that
# custom or more complicated types tend to come first.
documentation_bias = {constraint : 1 for constraint in itertools.chain(internal.types.ordered, internal.types.unordered)}
documentation_bias.update({constraint : 2 for constraint in itertools.chain(internal.types.integer, internal.types.string)})
@classmethod
def sorted_documentation(cls, descriptions):
'''Return the provided `descriptions` in the order that was used to generate their documentation.'''
# First we need to look at the documentation and extract the number of lines.
iterable = ((F, pycompat.function.documentation(F) or '') for F in descriptions)
stripped = ((F, string.strip()) for F, string in iterable)
newlines = {F : string.count('\n') for F, string in stripped}
# Now we extract the constraints for each parameter so that we can calculate the
# number of parameters along with a bias based on the constraints.
items = [(F, constraints) for F, (_, _, constraints) in descriptions.items()]
counts = {F : len(constraints) for F, constraints in items}
bias = {F : sum(max((cls.documentation_bias.get(item, 0) for item in items) if items else [0]) for items in constraints) for F, constraints in items}
# Afterwards we can sort by number of lines, number of parameters, and then constraint bias.
items = [((newlines[F], counts[F], bias[F]), F) for F in descriptions]
return [F for _, F in sorted(items, key=operator.itemgetter(0))]
@classmethod
def prototype(cls, function, constraints={}, ignored={item for item in []}):
'''Generate a prototype for the given `function` and `constraints` while skipping over the `ignored` argument names.'''
args, defaults, (star, starstar) = cls.ex_args(function)
def Femit_arguments(names, constraints, ignored):
'''Yield a tuple for each individual parameter composed of the name and its constraints.'''
# Iterate through all of our argument names. If any of them are within
# our ignored items, however, then we can simply skip over them.
for item in names:
if item in ignored:
continue
# If the current argument name is not within our constraints, then
# we only have to yield the argument name and move on.
if item not in constraints:
yield item, None
continue
# Figure out which constraint to use for each item, and yield how
# it should be represented back to the caller.
constraint = constraints[item]
if isinstance(constraint, internal.types.type) or constraint in {internal.types.callable}:
yield item, constraint.__name__
elif hasattr(constraint, '__iter__'):
yield item, '|'.join(type.__name__ for type in cls.flatten(constraint))
else:
yield item, "{!s}".format(constraint)
continue
return
# Log any multicased functions that accidentally define type constraints for parameters
# which don't actually exist. This is specifically done in order to aid debugging.
unavailable = {constraint_name for constraint_name in constraints.keys()} - {argument_name for argument_name in args}
if unavailable:
co = pycompat.function.code(function)
co_fullname, co_filename, co_lineno = '.'.join([function.__module__, function.__name__]), os.path.relpath(co.co_filename, idaapi.get_user_idadir()), co.co_firstlineno
proto_s = "{:s}({:s}{:s}{:s})".format(co_fullname, ', '.join(args) if args else '', ", *{:s}".format(star) if star and args else "*{:s}".format(star) if star else '', ", **{:s}".format(starstar) if starstar and (star or args) else "**{:s}".format(starstar) if starstar else '')
path_s = "{:s}:{:d}".format(co_filename, co_lineno)
logging.warning("{:s}({:s}): Unable to constrain the type in {:s} for parameter{:s} ({:s}) at {:s}.".format('.'.join([__name__, cls.__name__]), co_fullname, proto_s, '' if len(unavailable) == 1 else 's', ', '.join(unavailable), path_s))
# Return the prototype for the current function with the provided parameter constraints.
iterable = (item if parameter is None else "{:s}={:s}".format(item, parameter) for item, parameter in Femit_arguments(args, constraints, ignored))
items = iterable, ["*{:s}".format(star)] if star else [], ["**{:s}".format(starstar)] if starstar else []
return "{:s}({:s})".format(pycompat.function.name(function), ', '.join(itertools.chain(*items)))
@classmethod
def match(cls, packed_parameters, tree, table):
'''Use the provided `packed_parameters` to find a matching callable in both `tree` and `table`.'''
args, kwargs = packed_parameters
candidates = cls.filter_args(packed_parameters, tree, table)
# If there's no other filters, then we filter our candidates
# by culling out everything that can still take parameters.
if not kwargs:
assert(all(index >= 0 for F, index in candidates))
iterable = ((F, tree[index][F]) for F, index in candidates)
branch = [(F, (name, index, next)) for F, (name, index, next) in iterable if next <= len(args)]
# Now we have a branch containing everything that we care about. So, all we really
# need to do here is collect our results if we have any that are available.
results = [(F, next) for F, (name, index, next) in branch if (F, next) not in table]
# In case we didn't get any results, then we move onto the next branch so that we
# can grab the next varargs or wildargs candidates that are still available.
nextbranch = [(F, tree[next][F]) for F, (name, index, next) in branch if (F, next) in table]
vars = [(F, next) for F, (name, index, next) in nextbranch if index == next and index <= len(args)]
wild = [(F, next) for F, (name, index, next) in nextbranch if index != next and next < 0]
# That was everything, so we just need to return them in the correct order.
return results + vars + wild
# If we had some args that we ended up processing, then we need to shift them if there's
# still some parameters or promote them to keywords to do the final filtering pass.
elif args:
iterable = [(F, tree[index][F]) for F, index in candidates if (F, index) in table]
candidates = [(F, -1 if index == next else next) for F, (_, index, next) in iterable]
#candidates = [(F, -1 if index == next else index) for F, (_, index, next) in iterable]
# Now each candidate should be at the right place in their tree and we can filter keywords.
results = cls.filter_keywords(candidates, packed_parameters, tree, table)
# Last thing to do is to take our results, filter their matches, sort them by their
# and then we can return them to the caller to actually use them.
iterable = ((F, tree[index][F]) for F, index in results)
return [(F, next) for F, (_, index, next) in iterable if next < 0 or (F, next) not in table]
@classmethod
def new_wrapper(cls, func, cache, Fmissing=None, Fdebug_candidates=None):
'''Create a new wrapper that will determine the correct function to call.'''
tree, table = cache
## Define the wrapper for the function that we're decorating. This way whenever the
## decorated function gets called, we can search for one that matches the correct
## constraints and dispatch into it with the correctly transformed parameters.
def F(*arguments, **keywords):
packed_parameters = arguments, keywords
candidates = cls.match(packed_parameters, tree, table)
iterable = cls.preordered(packed_parameters, candidates, tree, table)
# Extract our first match if we were able to find one. If not, then pass what
# we tried to match to the missing-hook to complain about it.
result = next(iterable, None)
if result is None:
if Fmissing is not None:
return Fmissing(packed_parameters, tree, table)
raise RuntimeError(packed_parameters, tree, table)
# If our debug-hook is defined, then pass our matches to it so that it can be
# dumped to the screen or stashed somewhere to assist debugging.
if Fdebug_candidates is not None:
res = Fdebug_candidates(itertools.chain([result], iterable), packed_parameters, tree, table)
assert(res is None)
# We got a callable, so we just need to call it with our parameters.
F, (args, kwds) = result
return F(*args, **kwds)
# First, we need to swap out the original code object with the one from the closure
# that we defined. In order to preserve information within the backtrace, we just
# make a copy of all of the relevant code properties.
f, c = F, pycompat.function.code(F)
cargs = c.co_argcount, c.co_nlocals, c.co_stacksize, c.co_flags, \
c.co_code, c.co_consts, c.co_names, c.co_varnames, \
c.co_filename, '.'.join([func.__module__, pycompat.function.name(func)]), \
c.co_firstlineno, c.co_lnotab, c.co_freevars, c.co_cellvars
newcode = pycompat.code.new(cargs, pycompat.code.unpack_extra(c))
# Now we can use the new code object that we created in order to create a function,
# assign the previous name and documentation into it, and return it.
result = pycompat.function.new(newcode, pycompat.function.globals(f), pycompat.function.name(f), pycompat.function.defaults(f), pycompat.function.closure(f))
pycompat.function.set_name(result, pycompat.function.name(func)),
pycompat.function.set_documentation(result, pycompat.function.documentation(func))
return result
@classmethod
def ex_function(cls, object):
'''Extract the actual function type from a callable.'''
if isinstance(object, internal.types.function):
return object
elif isinstance(object, internal.types.method):
return pycompat.method.function(object)
elif isinstance(object, internal.types.code):
res, = (item for item in gc.get_referrers(c) if pycompat.function.name(item) == pycompat.code.name(c) and isinstance(item, internal.types.function))
return res
elif isinstance(object, internal.types.descriptor):
return object.__func__
raise internal.exceptions.InvalidTypeOrValueError(object)
@classmethod
def reconstructor(cls, item):
'''Return a closure that returns the original callable type for a function.'''
if isinstance(item, internal.types.function):
return lambda f: f
if isinstance(item, internal.types.method):
return lambda f: pycompat.method.new(f, pycompat.method.self(item), pycompat.method.type(item))
if isinstance(item, internal.types.descriptor):
return lambda f: type(item)(f)
if isinstance(item, internal.types.instance):
return lambda f: internal.types.InstanceType(type(item), {key : value for key, value in f.__dict__.items()})
if isinstance(item, internal.types.class_t):
return lambda f: type(item)(item.__name__, item.__bases__, {key : value for key, value in f.__dict__.items()})
raise internal.exceptions.InvalidTypeOrValueError(type(item))
@classmethod
def ex_args(cls, f):
'''Extract the arguments from a function.'''
c = pycompat.function.code(f)
varnames_count, varnames_iter = pycompat.code.argcount(c), (item for item in pycompat.code.varnames(c))
args = tuple(itertools.islice(varnames_iter, varnames_count))
res = { a : v for v, a in zip(reversed(pycompat.function.defaults(f) or []), reversed(args)) }
try: starargs = next(varnames_iter) if pycompat.code.flags(c) & cls.CO_VARARGS else ""
except StopIteration: starargs = ""
try: kwdargs = next(varnames_iter) if pycompat.code.flags(c) & cls.CO_VARKEYWORDS else ""
except StopIteration: kwdargs = ""
return args, res, (starargs, kwdargs)
@arizvisa
Copy link
Author

there's probably a potential bug here with functions that use keywords, specifically when transitioning from the parameters to the keywords. this is marked by critique_and_continue and might result in another parameter being consumed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment