Skip to content

Instantly share code, notes, and snippets.

@graingert
Last active July 22, 2022 15:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save graingert/cb40f8000f3ef52b64dd6b4df5648e68 to your computer and use it in GitHub Desktop.
Save graingert/cb40f8000f3ef52b64dd6b4df5648e68 to your computer and use it in GitHub Desktop.
import asyncio
import contextlib
class _AcknowledgeException:
def __init__(self, exception):
self.exception = exception
class _RecommendGeneratorExit(BaseException):
pass
async def agen_fn():
try:
try:
while True:
try:
await asyncio.Event().wait()
except asyncio.CancelledError as e:
yield _AcknowledgeException(e)
except _RecommendGeneratorExit as genexit:
try:
await asyncio.Event().wait()
except asyncio.CancelledError as e:
yield _AcknowledgeException(e)
yield _AcknowledgeException(genexit)
except _RecommendGeneratorExit as genexit2:
try:
await asyncio.Event().wait()
except asyncio.CancelledError as e:
yield _AcknowledgeException(e)
yield _AcknowledgeException(genexit2)
class CoroutineWrapper:
def __init__(self, coro):
self._coro = coro
def __await__(self):
return self
def __next__(self):
return self.send(None)
def send(self, v):
try:
return self._coro.send(v)
except StopIteration as e:
if type(e.value) is _AcknowledgeException:
raise e.value.exception
raise
def throw(self, *args, **kwargs):
try:
return self._coro.throw(*args, **kwargs)
except StopIteration as e:
if type(e.value) is _AcknowledgeException:
raise e.value.exception
raise
class AsyncGenWrapper:
def __init__(self, agen):
self._agen = agen
async def __anext__(self):
return await self.asend(None)
async def asend(self, v):
return await CoroutineWrapper(self._agen.asend(v))
async def athrow(self, *args, **kwargs):
return await CoroutineWrapper(self._agen.athrow(*args, **kwargs))
async def aclose(self):
try:
return await CoroutineWrapper(self._agen.athrow(_RecommendGeneratorExit))
except _RecommendGeneratorExit:
pass
def __aiter__(self):
return self
async def amain():
try:
async with asyncio.timeout(None) as timeout:
async with contextlib.aclosing(AsyncGenWrapper(agen_fn())) as agen:
try:
async with asyncio.timeout(0):
async for v in agen:
print(v)
except TimeoutError:
print("timed out")
else:
raise RuntimeError
try:
async with asyncio.timeout(0):
async for v in agen:
print(v)
except TimeoutError:
print("timed out")
else:
raise RuntimeError
timeout.reschedule(-1)
finally:
async with asyncio.timeout(1):
await agen.aclose()
asyncio.run(amain())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment