Skip to content

Instantly share code, notes, and snippets.

@spinfish
Last active September 10, 2021 01:51
Show Gist options
  • Save spinfish/34ce07b6c0fe007a84c514e47262e409 to your computer and use it in GitHub Desktop.
Save spinfish/34ce07b6c0fe007a84c514e47262e409 to your computer and use it in GitHub Desktop.
The docstrings are only there so I look like a cool n professional Pythonista or whatever, look at me go with the module level dunders too woo, god I am so cringe but yeh anyway this is a decorator I made whilst feeling very bored 😎
"""
I'm so bored right now please help
Anyways this module provides a pure Python context manager decorator,
slightly more efficient than the contextlib ones and the one deco supports
both sync and async functions.
"""
__all__ = ("contextmanager",)
__license__ = "MIT"
__version__ = "1.0.1"
__author__ = "mackenzie"
from functools import wraps
from asyncio import iscoroutinefunction
class _ContextManager:
"""
Reimplementation of :class:`contextlib.contextmanager` with ``__slots__``,
however doubles as both a sync context manager and an async one.
Attributes
----------
is_coroutine: :class:`bool`
Whether the generator function passed is a coroutine function.
"""
__slots__ = ("__ignore", "__result_of_gen_func", "__is_coroutine")
def __init__(self, func, *args, ignore=False, **kwargs):
"""
Parameters
----------
func: :class:`function`
A generator function that yields a single value that is bound to ``as``.
args: :class:`tuple`
Arguments to pass to the generator function.
kwargs: :class:`dict`
Keyword only arguments to pass to the generator function.
ignore: :class:`bool`
Whether to suppress errors in ``__exit__`` or ``__aexit__``. Defaults to ``False``.
"""
if not (func.__class__.__name__ == 'function' and callable(func)):
raise TypeError(
"func must be a function, received type {!r} instead".format(
func.__class__
))
self.__ignore = ignore
self.__is_coroutine = iscoroutinefunction(func)
self.__result_of_gen_func = func(*args, **kwargs)
def __init_subclass__(cls):
raise TypeError("inheriting from class {!r} is forbidden".format(cls))
@property
def is_coroutine(self):
return self.__ignore
@staticmethod
def raise_runtime(state):
raise RuntimeError("generator did not {}".format(state))
def __enter__(self):
try:
return next(self.__result_of_gen_func)
except StopIteration:
self.raise_runtime("yield")
def __exit__(self, cls, value, traceback):
try:
next(self.__result_of_gen_func)
except Exception:
return self.__ignore
else:
self.raise_runtime("stop")
async def __aenter__(self):
try:
return await self.__result_of_gen_func.__anext__()
except StopAsyncIteration:
self.raise_runtime("yield")
async def __aexit__(self, cls, value, traceback):
try:
await self.__result_of_gen_func.__anext__()
except Exception:
return self.__ignore
else:
self.raise_runtime("stop")
def contextmanager(ignore=False):
"""
Context manager decorator for both sync and async functions.
Example
-------
.. code-block:: python3
@contextmanager(ignore=True)
def managed_resource(*args, **kwargs):
resource = retrieve_resource(*args, **kwargs)
try:
yield resource
finally:
resource.close()
with managed_resource(timeout=120) as resource:
# Because ignore is True any exceptions will be swallowed
process_resource(resource)
"""
def inner(func):
@wraps(func)
def _nested(*args, **kwargs):
return _ContextManager(func, *args, ignore=ignore, **kwargs)
return _nested
return inner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment