Created
April 17, 2023 22:25
-
-
Save Andykl/20aab3a3eab63321dacb1f91d36e8750 to your computer and use it in GitHub Desktop.
Python context manager protocol extension, __with__ approach
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
def default___with__(obj, depth): | |
# Exception in __enter__ should be propagated. | |
enter_result = obj.__enter__() | |
try: | |
yield enter_result | |
except BaseException as e: | |
if not obj.__exit__(type(e), e, e.__traceback__): | |
raise | |
else: | |
obj.__exit__(None, None, None) | |
class CMProtocolV2: | |
_with_stack = [] | |
def __init__(self, manager_expr: str, bind_name: str | None = None): | |
self.manager_expr = manager_expr | |
self.bind_name = bind_name | |
self._with_generator = None | |
def __enter__(self): | |
import sys | |
caller_frame = sys._getframe(1) | |
# Exception in evaluation or __enter__ interrupts the context manager. | |
manager = eval(self.manager_expr, caller_frame.f_globals, caller_frame.f_locals) | |
depth = sum(i is manager for i in CMProtocolV2._with_stack) | |
try: | |
with_gen = type(manager).__with__ | |
except AttributeError: | |
# Check if the object supports the old protocol. | |
try: | |
manager.__enter__ | |
manager.__exit__ | |
except AttributeError: | |
raise TypeError("not a context manager") from None | |
with_gen = default___with__ | |
gen = with_gen(manager, depth) | |
enter_result = gen.send(None) | |
if self.bind_name is not None: | |
caller_frame.f_locals[self.bind_name] = enter_result | |
self._with_generator = gen | |
CMProtocolV2._with_stack.append(manager) | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
import sys | |
caller_frame = sys._getframe(1) | |
gen = self._with_generator | |
assert gen is not None | |
CMProtocolV2._with_stack.pop() | |
try: | |
try: | |
if exc_val is None: | |
leave_result = gen.send(None) | |
else: | |
leave_result = gen.throw(exc_val) | |
# If the generator is exhausted, it doesn't want to rebind the target, | |
# but also means that the exception was handled. | |
except StopIteration: | |
return True | |
except BaseException as e: | |
if e is exc_val: | |
# Exception is the same, so protocol wants us to re-raise it. | |
# No binding of result_name. | |
return False | |
else: | |
# raise the new exception context manager created. | |
raise e from exc_val | |
finally: | |
# We need to close the generator to warn the user if they | |
# wrote more code after the second yield. | |
try: | |
gen.close() | |
except RuntimeError: | |
raise RuntimeError("You really should not try to write the code to catch GeneratorExit.") | |
# It should be exhausted now. | |
self._with_generator = None | |
if self.bind_name is not None: | |
caller_frame.f_locals[self.bind_name] = leave_result | |
# Suppress the exception in old protocol, __leave__ has handled it. | |
return True | |
class Manager: | |
def __init__(self, enter_result, leave_result): | |
self.enter_result = enter_result | |
self.leave_result = leave_result | |
def __with__(self, depth): | |
print(f'enter with depth {depth}') | |
try: | |
yield self.enter_result | |
except BaseException as e: | |
print(f'exception {e!r}') | |
else: | |
print('no exception') | |
finally: | |
print('finally') | |
print(f'leave with depth {depth}') | |
yield self.leave_result | |
manager_obj = Manager("enter", "exit") | |
with CMProtocolV2('manager_obj', 'value'): | |
print(f'value after enter: {value}') | |
with CMProtocolV2('manager_obj', 'value_2'): | |
print(f'value_2 after enter 2: {value_2}') | |
print('body') | |
print(f'value after exit 2: {value}') | |
print(f'value_2 after exit 2: {value_2}') | |
# raise ValueError | |
# raise StopIteration | |
# raise RuntimeError | |
raise GeneratorExit | |
print(f'value after exit: {value}') | |
print(f'value_2 after exit: {value_2}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment