Skip to content

Instantly share code, notes, and snippets.

@fgoinai
Created July 24, 2018 15:41
Show Gist options
  • Save fgoinai/37f26a11a161aaab84b6e68d356fc678 to your computer and use it in GitHub Desktop.
Save fgoinai/37f26a11a161aaab84b6e68d356fc678 to your computer and use it in GitHub Desktop.
class compose:
def __init__(self, *fs: Callable, from_left=False):
self.__from_left = from_left
self.__f_list = list(fs)
def __call__(self, *args, **kwargs):
fs = self.__f_list if self.__from_left else list(reversed(self.__f_list))
return reduce(lambda result, f: f(result), tail(fs), head(fs)(*args, **kwargs))
def __or__(self, other: Callable) -> 'compose':
# other must be callable, you know that
self.__f_list = [other, *self.__f_list]
return self
def __ror__(self, other: Callable) -> 'compose':
self.__f_list = [*self.__f_list, other]
return self
class curry:
def __init__(self, f: Union[Callable, 'curry']):
# assume f does not contain varargs
if isinstance(f, curry):
self.f = f.f
self.default_args_queue = f.default_args_queue
self.sig = f.sig
self.args_queue = f.args_queue
else:
self.f = f
self.sig = signature(f)
self.default_args_queue = OrderedDict()
for k, v in self.sig.parameters.items():
if v.default != Parameter.empty:
self.default_args_queue[k] = v.default
self.args_queue = {}
def __call__(self, *args, **kwargs):
new_instance = deepcopy(self)
f_sig_keys = new_instance.sig.parameters.keys()
if args:
for arg in args:
# try:
key = next(filter(lambda x: x not in new_instance.args_queue, f_sig_keys))
new_instance.args_queue[key] = arg
# except StopIteration:
# pass
if kwargs:
for k, v in kwargs.items():
if k not in new_instance.sig.parameters:
raise ValueError('{} is not a valid param key'.format(k))
new_instance.args_queue[k] = v
params = {**new_instance.default_args_queue, **new_instance.args_queue}
params_len_diff = len(new_instance.sig.parameters) - len(params)
if params_len_diff == 0:
return new_instance.f(**params)
elif params_len_diff < 0:
raise Exception('Params len exceed!')
else:
return curry(new_instance)
class TrampolineException(Exception):
def __init__(self, args, kwargs):
self.args = args
self.kwargs = kwargs
def trampoline(layers=2):
"""
normally this fn is used as decorator on fn tco, which layers should be 2
1.
Example:
@trampoline()
def foo(i: int):
return i-1
Execution stack:
wrapper -> foo (count: 2)
2.
But this the stack thickness vary with fn wrapped by trampoline
Example:
@trampoline(3)
@curry
def bar(x: int, y: int):
return x+y
Execution stack:
wrapper -> curry -> bar
i.e. You should know the stuffing layers thickness, change layers value yourself
"""
def core(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
frame = sys._getframe()
def get_target_and_verify(fm):
target = fm.f_back
for _ in range(layers, 0, -1):
if not target:
return False
if target.f_code == fm.f_code:
return True
target = target.f_back
return False
if get_target_and_verify(frame):
raise TrampolineException(args, kwargs)
else:
while True:
try:
return f(*args, **kwargs)
except TrampolineException as e:
args = e.args
kwargs = e.kwargs
return wrapper
return core
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment