Example implementations of context managers
import asyncio | |
from typing import Callable | |
class ScopedActions(object): | |
startup: Callable[[], None] | |
cleanup: Callable[[], None] | |
name: str | |
def __init__(self, name: str, startup: Callable[[], None], cleanup: Callable[[], None]): | |
self.name = name | |
self.startup = startup | |
self.cleanup = cleanup | |
async def __aenter__(self): | |
await asyncio.sleep(1) | |
self.startup() | |
return self | |
async def __aexit__(self, type, value, traceback): | |
await asyncio.sleep(1) | |
self.cleanup() | |
async def test(): | |
async with ScopedActions(name='test', startup=lambda: print('started'), | |
cleanup=lambda: print('finished')) as scoped: | |
print('Executing:', scoped.name) | |
loop = asyncio.new_event_loop() | |
result = loop.run_until_complete(test()) |
import asyncio | |
from contextlib import asynccontextmanager | |
from typing import Callable | |
@asynccontextmanager | |
async def scoped_actions(name: str, startup: Callable[[], None], cleanup: Callable[[], None]): | |
startup() | |
await asyncio.sleep(1) | |
yield name | |
cleanup() | |
async def test(): | |
async with scoped_actions(name='test', startup=lambda: print('started'), cleanup=lambda: print('finished')) as name: | |
print('Executing:', name) | |
loop = asyncio.new_event_loop() | |
result = loop.run_until_complete(test()) |
from typing import Callable | |
class ScopedActions(object): | |
startup: Callable[[], None] | |
cleanup: Callable[[], None] | |
name: str | |
def __init__(self, name: str, startup: Callable[[], None], cleanup: Callable[[], None]): | |
self.name = name | |
self.startup = startup | |
self.cleanup = cleanup | |
def __enter__(self): | |
self.startup() | |
return self | |
def __exit__(self, type, value, traceback): | |
self.cleanup() | |
with ScopedActions(name='test', startup=lambda: print('started'), cleanup=lambda: print('finished')) as scoped: | |
print('Executing:', scoped.name) |
from contextlib import contextmanager | |
from typing import Callable | |
@contextmanager | |
def scoped_actions(name: str, startup: Callable[[], None], cleanup: Callable[[], None]): | |
startup() | |
yield name | |
cleanup() | |
with scoped_actions(name='test', startup=lambda: print('started'), cleanup=lambda: print('finished')) as name: | |
print('Executing:', name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment