Skip to content

Instantly share code, notes, and snippets.

@Andykl
Created April 17, 2023 22:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Andykl/20aab3a3eab63321dacb1f91d36e8750 to your computer and use it in GitHub Desktop.
Save Andykl/20aab3a3eab63321dacb1f91d36e8750 to your computer and use it in GitHub Desktop.
Python context manager protocol extension, __with__ approach
from __future__ import annotations
def default___with__(obj, depth):
# Exception in __enter__ should be propagated.
enter_result = obj.__enter__()
try:
yield enter_result
except BaseException as e:
if not obj.__exit__(type(e), e, e.__traceback__):
raise
else:
obj.__exit__(None, None, None)
class CMProtocolV2:
_with_stack = []
def __init__(self, manager_expr: str, bind_name: str | None = None):
self.manager_expr = manager_expr
self.bind_name = bind_name
self._with_generator = None
def __enter__(self):
import sys
caller_frame = sys._getframe(1)
# Exception in evaluation or __enter__ interrupts the context manager.
manager = eval(self.manager_expr, caller_frame.f_globals, caller_frame.f_locals)
depth = sum(i is manager for i in CMProtocolV2._with_stack)
try:
with_gen = type(manager).__with__
except AttributeError:
# Check if the object supports the old protocol.
try:
manager.__enter__
manager.__exit__
except AttributeError:
raise TypeError("not a context manager") from None
with_gen = default___with__
gen = with_gen(manager, depth)
enter_result = gen.send(None)
if self.bind_name is not None:
caller_frame.f_locals[self.bind_name] = enter_result
self._with_generator = gen
CMProtocolV2._with_stack.append(manager)
def __exit__(self, exc_type, exc_val, exc_tb):
import sys
caller_frame = sys._getframe(1)
gen = self._with_generator
assert gen is not None
CMProtocolV2._with_stack.pop()
try:
try:
if exc_val is None:
leave_result = gen.send(None)
else:
leave_result = gen.throw(exc_val)
# If the generator is exhausted, it doesn't want to rebind the target,
# but also means that the exception was handled.
except StopIteration:
return True
except BaseException as e:
if e is exc_val:
# Exception is the same, so protocol wants us to re-raise it.
# No binding of result_name.
return False
else:
# raise the new exception context manager created.
raise e from exc_val
finally:
# We need to close the generator to warn the user if they
# wrote more code after the second yield.
try:
gen.close()
except RuntimeError:
raise RuntimeError("You really should not try to write the code to catch GeneratorExit.")
# It should be exhausted now.
self._with_generator = None
if self.bind_name is not None:
caller_frame.f_locals[self.bind_name] = leave_result
# Suppress the exception in old protocol, __leave__ has handled it.
return True
class Manager:
def __init__(self, enter_result, leave_result):
self.enter_result = enter_result
self.leave_result = leave_result
def __with__(self, depth):
print(f'enter with depth {depth}')
try:
yield self.enter_result
except BaseException as e:
print(f'exception {e!r}')
else:
print('no exception')
finally:
print('finally')
print(f'leave with depth {depth}')
yield self.leave_result
manager_obj = Manager("enter", "exit")
with CMProtocolV2('manager_obj', 'value'):
print(f'value after enter: {value}')
with CMProtocolV2('manager_obj', 'value_2'):
print(f'value_2 after enter 2: {value_2}')
print('body')
print(f'value after exit 2: {value}')
print(f'value_2 after exit 2: {value_2}')
# raise ValueError
# raise StopIteration
# raise RuntimeError
raise GeneratorExit
print(f'value after exit: {value}')
print(f'value_2 after exit: {value_2}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment