Last active
January 6, 2019 05:52
-
-
Save arizvisa/21d3d8a56a0c943f9a0943d077aa61ee to your computer and use it in GitHub Desktop.
(Python2) properly wrapping a callable whilst retaining all information about the wrappee
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools,operator,itertools,types,six | |
class wrap(object): | |
""" | |
A lot of magic is in this class which allows one to do a proper wrap | |
around a single callable. | |
""" | |
CO_OPTIMIZED = 0x00001 | |
CO_NEWLOCALS = 0x00002 | |
CO_VARARGS = 0x00004 | |
CO_VARKEYWORDS = 0x00008 | |
CO_NESTED = 0x00010 | |
CO_VARGEN = 0x00020 | |
CO_NOFREE = 0x00040 | |
CO_COROUTINE = 0x00080 | |
CO_ITERABLE = 0x00100 | |
CO_GENERATOR_ALLOWED = 0x01000 | |
CO_FUTURE_DIVISION = 0x02000 | |
CO_FUTURE_ABSOLUTE_IMPORT = 0x04000 | |
CO_FUTURE_WITH_STATEMENT = 0x08000 | |
CO_FUTURE_PRINT_FUNCTION = 0x10000 | |
CO_FUTURE_UNICODE_LITERALS = 0x20000 | |
CO_FUTURE_BARRY_AS_BDFL = 0x40000 | |
CO_FUTURE_GENERATOR_STOP = 0x80000 | |
import opcode, compiler.consts as consts | |
@classmethod | |
def co_assemble(cls, operation, operand=None): | |
'''Assembles the specified `operation` and `operand` into a code string.''' | |
opcode = cls.opcode.opmap[operation] | |
if operand is None: | |
return six.int2byte(opcode) | |
# if operand was defined, then encode it | |
op1 = (operand & 0x00ff) / 0x0001 | |
op2 = (operand & 0xff00) / 0x0100 | |
return reduce(operator.add, map(six.int2byte, (opcode, op1, op2)), bytes()) | |
@classmethod | |
def co_varargsQ(cls, co): | |
'''Returns whether the provided code type, `co`, takes variable arguments.''' | |
return bool(co.co_flags & cls.consts.CO_VARARGS) | |
@classmethod | |
def co_varkeywordsQ(cls, co): | |
'''Returns whether the provided code type, `co`, takes variable keyword arguments.''' | |
return bool(co.co_flags & cls.consts.CO_VARKEYWORDS) | |
@classmethod | |
def cell(cls, *args): | |
'''Convert `args` into a ``cell`` tuple.''' | |
return tuple(((lambda item: lambda : item)(arg).func_closure[0]) for arg in args) | |
@classmethod | |
def assemble(cls, function, wrapper, bound=False): | |
"""Assemble a ``types.CodeType`` that will execute `wrapper` with `F` as its first parameter. | |
If `bound` is ``True``, then assume that the first parameter for `F` represents the instance it's bound to. | |
""" | |
F, C, S = map(cls.extract, (function, wrapper, cls.assemble)) | |
Fc, Cc, Sc = map(operator.attrgetter('func_code'), (F, C, S)) | |
## build the namespaces that we'll use | |
Tc = cls.co_varargsQ(Fc), cls.co_varkeywordsQ(Fc) | |
# first we'll build the globals that get passed to the wrapper | |
Sargs = ('F', 'wrapper') | |
Svals = (f if callable(f) else fo for f, fo in [(function, F), (wrapper, C)]) | |
# rip out the arguments from our target `F` | |
Fargs = Fc.co_varnames[:Fc.co_argcount] | |
Fwildargs = Fc.co_varnames[Fc.co_argcount : Fc.co_argcount + sum(Tc)] | |
# combine them into tuples for looking up variables | |
co_names, co_varnames = Sargs[:], Fargs[:] + Fwildargs[:] | |
# free variables (that get passed to `C`) | |
co_freevars = Sargs[:2] | |
# constants for code type (which consist of just the self-doc) | |
co_consts = (F.func_doc,) | |
## figure out some things for assembling the bytecode | |
# first we'll grab the call instruction type to use | |
call_ = { | |
(False, False) : 'CALL_FUNCTION', | |
(True, False) : 'CALL_FUNCTION_VAR', | |
(False, True) : 'CALL_FUNCTION_KW', | |
(True, True) : 'CALL_FUNCTION_VAR_KW', | |
} | |
call = call_[Tc] | |
# now we'll determine the flags to apply | |
flags_ = { | |
(False, False) : 0, | |
(True, False) : cls.CO_VARARGS, | |
(False, True) : cls.CO_VARKEYWORDS, | |
(True, True) : cls.CO_VARARGS | cls.CO_VARKEYWORDS | |
} | |
co_flags = cls.CO_NESTED | cls.CO_OPTIMIZED | cls.CO_NEWLOCALS | flags_[Tc] | |
## assemble the code type that gets turned into a function | |
code_, co_stacksize = [], 0 | |
asm = code_.append | |
# first we'll dereference our cellvar for `wrapper` | |
asm(cls.co_assemble('LOAD_DEREF', co_freevars.index('wrapper'))) | |
co_stacksize += 1 | |
# include the original `F` as the first arg | |
asm(cls.co_assemble('LOAD_DEREF', co_freevars.index('F'))) | |
co_stacksize += 1 | |
# now we can include all of the original arguments (cropped by +1 if bound) | |
for n in Fargs[int(bound):]: | |
asm(cls.co_assemble('LOAD_FAST', co_varnames.index(n))) | |
co_stacksize += 1 | |
# include any wildcard arguments | |
for n in Fwildargs: | |
asm(cls.co_assemble('LOAD_FAST', co_varnames.index(n))) | |
# call `wrapper` with the correct call type (+1 for `F`, -1 if bound) | |
asm(cls.co_assemble(call, len(Fargs) + 1 - int(bound))) | |
# and then return its value | |
asm(cls.co_assemble('RETURN_VALUE')) | |
# combine it into a single code string | |
co_code = bytes().join(code_) | |
## next we'll construct the code type based on what we have | |
cargs = len(Fargs), len(co_names) + len(co_varnames) + len(co_freevars), \ | |
co_stacksize, co_flags, co_code, \ | |
co_consts, co_names, co_varnames, \ | |
Fc.co_filename, Fc.co_name, Fc.co_firstlineno, \ | |
bytes(), co_freevars | |
func_code = types.CodeType(*cargs) | |
## and then turn it back into a function | |
res = types.FunctionType(func_code, F.func_globals, F.func_name, F.func_defaults, cls.cell(*Svals)) | |
res.func_name, res.func_doc = F.func_name, F.func_doc | |
return res | |
def __new__(cls, callable, wrapper): | |
'''Return a function similar to `callable` that calls `wrapper` with `callable` as the first argument.''' | |
cons, f = cls.constructor(callable), cls.extract(callable) | |
# create a wrapper for the function that'll execute `callable` with the function as its first argument, and the rest with any args | |
res = cls.assemble(callable, wrapper, bound=isinstance(callable, (classmethod, types.MethodType))) | |
res.__module__ = getattr(callable, '__module__', getattr(callable, '__module__', '__main__')) | |
# now we re-construct it and then return it | |
return cons(res) | |
@classmethod | |
def extract(cls, object): | |
'''Extract a ``types.FunctionType`` from a callable.''' | |
# `object` is already a function | |
if isinstance(object, types.FunctionType): | |
return object | |
# if it's a method, then extract the function from its propery | |
elif isinstance(object, types.MethodType): | |
return object.im_func | |
# if it's a code type, then walk through all of its referrers finding one that matches it | |
elif isinstance(object, types.CodeType): | |
res, = (n for n in gc.get_referrers(c) if n.func_name == c.co_name and isinstance(n, types.FunctionType)) | |
return res | |
# if it's a property decorator, then they hide the function in an attribute | |
elif isinstance(object, (staticmethod, classmethod)): | |
return object.__func__ | |
# okay, no go. we have no idea what this is. | |
raise internal.exceptions.InvalidTypeOrValueError(object) | |
@classmethod | |
def arguments(cls, f): | |
'''Extract the arguments from a function `f`.''' | |
c = f.func_code | |
varnames_count, varnames_iter = c.co_argcount, iter(c.co_varnames) | |
args = tuple(itertools.islice(varnames_iter, varnames_count)) | |
res = { a : v for v, a in zip(reversed(f.func_defaults or []), reversed(args)) } | |
starargs = next(varnames_iter, '') if c.co_flags & cls.CO_VARARGS else '' | |
kwdargs = next(varnames_iter, '') if c.co_flags & cls.CO_VARKEYWORDS else '' | |
return args, res, (starargs, kwdargs) | |
@classmethod | |
def constructor(cls, callable): | |
'''Return a closure that constructs the original `callable` type from a function.''' | |
# `callable` is a function type, so just return a closure that returns it | |
if isinstance(callable, types.FunctionType): | |
return lambda func: func | |
# if it's a method type, then we just need to extract the related properties to construct it | |
elif isinstance(callable, types.MethodType): | |
return lambda method, self=callable.im_self, cls=callable.im_class: types.MethodType(method, self, cls) | |
# if it's a property decorator, we just need to pass the function as an argument to the decorator | |
elif isinstance(callable, (staticmethod, classmethod)): | |
return lambda method, mt=callable.__class__: mt(method) | |
# if it's a method instance, then we just need to instantiate it so that it's bound | |
elif isinstance(callable, types.InstanceType): | |
return lambda method, mt=callable.__class__: types.InstanceType(mt, dict(method.__dict__)) | |
# otherwise if it's a class or a type, then we just need to create the object with its bases | |
elif isinstance(n, (types.TypeType, types.ClassType)): | |
return lambda method, t=callable.__class__, name=callable.__name__, bases=callable.__bases__: t(name, bases, dict(method.__dict__)) | |
# if we get here, then we have no idea what kind of type `callable` is | |
raise internal.exceptions.InvalidTypeOrValueError(callable.__class__) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment