Created
June 2, 2020 09:13
-
-
Save probablytom/3b1be9b654058e77e12a6fe564f46401 to your computer and use it in GitHub Desktop.
PyDySoFu version 3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from inspect import isfunction, isclass, ismethod | |
import re | |
import builtins | |
from functools import wraps, partial, reduce | |
from copy import deepcopy | |
import ast | |
import copy | |
import inspect | |
class AspectHooks: | |
pre_rules = list() | |
around_rules = list() | |
post_rules = list() | |
error_handling_rules = list() | |
def __enter__(self, *args, **kwargs): | |
self.old_import = __import__ | |
import builtins | |
builtins.__import__ = self.__import__ | |
def final_around(self, target, *args, **kwargs): | |
return target(*args, **kwargs) | |
def __import__(self, *args, **kwargs): | |
mod = self.old_import(*args, **kwargs) | |
def build_wrapper(target): | |
@wraps(target) | |
def wrapper(*args, **kwargs): | |
pre, around, post, error_handlers = self.get_rules(target.__name__) | |
try: | |
# Run pre advice | |
[advice(target, *args, **kwargs) for advice in pre] | |
def nest_around_call(nested_around, next_around): | |
return partial(next_around, nested_around) | |
# NOTE. The signature of around functions is around(next_around_function, target, *args, **kwargs), | |
# ... and they MUST make the call next_around_function(target*args, **kwargs) | |
# for around_index in range(len(around-1)): | |
# around[around_index] = partial(around[around_index], around_index[around_index+1]) | |
# around[-1] = partial(around[-1], self.final_around) | |
nested_around = reduce(nest_around_call, | |
around[::-1], | |
self.final_around) | |
ret = nested_around(target, *args, **kwargs) | |
for advice in post: | |
post_return = advice(target, ret, *args, **kwargs) | |
ret = ret if post_return is None else post_return | |
return ret | |
except Exception as exception: | |
prevent_raising = False | |
for handler in error_handlers: | |
prevent_raising = prevent_raising or handler(target, exception, *args, **kwargs) | |
if not prevent_raising: | |
raise exception | |
return wrapper | |
def apply_hooks(target_object): | |
for item_name in filter(lambda p: isinstance(p, str) and len(p) > 1 and p[:2] != "__", dir(target_object)): | |
item = getattr(target_object, item_name) | |
if isfunction(item) or ismethod(item): | |
setattr(target_object, item_name, build_wrapper(item)) | |
elif isclass(item): | |
apply_hooks(item) | |
apply_hooks(mod) | |
return mod | |
@classmethod | |
def add_prelude(cls, rule, advice, urgency=0): | |
AspectHooks.pre_rules.append((re.compile(rule), advice, urgency)) | |
@classmethod | |
def add_around(cls, rule, advice, urgency=0): | |
AspectHooks.around_rules.append((re.compile(rule), advice, urgency)) | |
@classmethod | |
def add_encore(cls, rule, advice, urgency=0): | |
AspectHooks.post_rules.append((re.compile(rule), advice, urgency)) | |
@classmethod | |
def add_error_handler(cls, rule, advice, urgency=0): | |
AspectHooks.error_handling_rules.append((re.compile(rule), advice, urgency)) | |
def get_rules(self, methodname): | |
pre, around, post, error_handlers = list(), list(), list(), list() | |
for pattern, advice, urgency in AspectHooks.pre_rules: | |
if pattern.match(methodname) is not None: | |
pre.append((advice, urgency)) | |
for pattern, advice, urgency in AspectHooks.around_rules: | |
if pattern.match(methodname) is not None: | |
around.append((advice, urgency)) | |
for pattern, advice, urgency in AspectHooks.post_rules: | |
if pattern.match(methodname) is not None: | |
post.append((advice, urgency)) | |
for pattern, advice, urgency in AspectHooks.error_handling_rules: | |
if pattern.match(methodname) is not None: | |
error_handlers.append((advice, urgency)) | |
urgencykey = lambda ad_urg_tuple: -ad_urg_tuple[1] # Negative so we get the highest urgency first | |
pre, around, post, error_handlers = sorted(pre, key=urgencykey), sorted(around, key=urgencykey), sorted(post, key=urgencykey), sorted(error_handlers, key=urgencykey) | |
advice_functions = list(map(lambda x: x[0], pre)), list(map(lambda x: x[0], around)), list(map(lambda x: x[0], post)), list(map(lambda x: x[0], error_handlers)) | |
return advice_functions | |
def __exit__(self, *args, **kwargs): | |
builtins.__import__ = self.old_import | |
if __name__ == "__main__": | |
from math import pow | |
with AspectHooks(): | |
import testmod | |
invocations = 0 | |
def encore(*args, **kwargs): | |
global invocations | |
invocations += 1 | |
def loggingaround(next_around, target, *args, **kwargs): | |
print("running " + str(target)) | |
ret = next_around(target, *args, **kwargs) | |
print(str(target) + " returned " + str(ret)) | |
return ret | |
def switch_to_power_of_two(next_around, target, *args, **kwargs): | |
return next_around(partial(pow, 2), *args, **kwargs) | |
AspectHooks.add_encore(".*", encore) | |
AspectHooks.add_around(".*", loggingaround) | |
assert testmod.dosomething() is None | |
assert testmod.Calc().triple(4) == 12 | |
assert testmod.double(5) == 10 | |
AspectHooks.add_around("double", switch_to_power_of_two) | |
assert testmod.double(5) == 32 | |
assert invocations == 4 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment