Skip to content

Instantly share code, notes, and snippets.

@anatoly-kussul
Last active March 1, 2024 16:27
Show Gist options
  • Save anatoly-kussul/f2d7444443399e51e2f83a76f112364d to your computer and use it in GitHub Desktop.
Save anatoly-kussul/f2d7444443399e51e2f83a76f112364d to your computer and use it in GitHub Desktop.
Python sync-async decorator factory
class SyncAsyncDecoratorFactory:
"""
Factory creates decorator which can wrap either a coroutine or function.
To return something from wrapper use self._return
If you need to modify args or kwargs, you can yield them from wrapper
"""
def __new__(cls, *args, **kwargs):
instance = super().__new__(cls)
# This is for using decorator without parameters
if len(args) == 1 and not kwargs and (inspect.iscoroutinefunction(args[0]) or inspect.isfunction(args[0])):
instance.__init__()
return instance(args[0])
return instance
class ReturnValue(Exception):
def __init__(self, return_value):
self.return_value = return_value
@contextmanager
def wrapper(self, *args, **kwargs):
raise NotImplementedError
@classmethod
def _return(cls, value):
raise cls.ReturnValue(value)
def __call__(self, func):
@wraps(func)
def call_sync(*args, **kwargs):
try:
with self.wrapper(*args, **kwargs) as new_args:
if new_args:
args, kwargs = new_args
return self.func(*args, **kwargs)
except self.ReturnValue as r:
return r.return_value
@wraps(func)
async def call_async(*args, **kwargs):
try:
with self.wrapper(*args, **kwargs) as new_args:
if new_args:
args, kwargs = new_args
return await self.func(*args, **kwargs)
except self.ReturnValue as r:
return r.return_value
self.func = func
return call_async if inspect.iscoroutinefunction(func) else call_sync
@anatoly-kussul
Copy link
Author

anatoly-kussul commented Dec 7, 2021

@NicoAdrian

Hey, sorry for late response.
The reason you are not getting what you want is that you can't directly return from context manager.

Here is slightly modified version that does what you want, not the best solution probably, just my first look at that, but you should get an idea:

from functools import wraps
import asyncio
from contextlib import contextmanager
import collections
import time
import inspect

class SyncAsyncDecoratorFactory:
    """
    Factory creates decorator which can wrap either a coroutine or function.
    To return something from wrapper use self._return
    If you need to modify args or kwargs, you can yield them from wrapper
    """
    def __new__(cls, *args, **kwargs):
        instance = super().__new__(cls)
        # This is for using decorator without parameters
        if len(args) == 1 and not kwargs and (inspect.iscoroutinefunction(args[0]) or inspect.isfunction(args[0])):
            instance.__init__()
            return instance(args[0])
        return instance

    class ReturnValue(Exception):
        def __init__(self, return_value):
            self.return_value = return_value

    @contextmanager
    def wrapper(self, *args, **kwargs):
        raise NotImplementedError

    @classmethod
    def _return(cls, value):
        raise cls.ReturnValue(value)

    def __call__(self, func):
        @wraps(func)
        def call_sync(*args, **kwargs):
            try:
                with self.wrapper(*args, **kwargs) as new_args:
                    if new_args:
                        args, kwargs = new_args
                    self._return(self.func(*args, **kwargs))
            except self.ReturnValue as r:
                return r.return_value

        @wraps(func)
        async def call_async(*args, **kwargs):
            try:
                with self.wrapper(*args, **kwargs) as new_args:
                    if new_args:
                        args, kwargs = new_args
                    self._return(await self.func(*args, **kwargs))
            except self.ReturnValue as r:
                return r.return_value

        self.func = func
        return call_async if inspect.iscoroutinefunction(func) else call_sync


class TimedLRU(SyncAsyncDecoratorFactory):
    def __init__(self, max_size=128, max_age=30):
        super().__init__()
        self.max_size = max_size
        self.max_age = max_age
        self._cache = collections.OrderedDict()
        self._sentinel = object()

    @contextmanager
    def wrapper(self, *args, **kwargs):
        k = args + (self._sentinel,) + tuple(sorted(kwargs.items()))
        if k in self._cache:
            print("hit")
            self._cache.move_to_end(k)
            res, ts = self._cache[k]
            if time.time() - ts <= self.max_age:
                self._return(res)
        try:
            yield
        except self.ReturnValue as r:
            self._cache[k] = (r.return_value, time.time())
            if len(self._cache) > self.max_size:
                self._cache.popitem(0)
            raise


@TimedLRU()
def foobar(s):
    print("in decorated function")
    return "foo %s bar" % s


a = foobar("hey")
print(a)
b = foobar("hey")
print(b)```

@NicoAdrian
Copy link

I see, thanks !

@albertmenglongli
Copy link

This class is cool! Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment