Skip to content

Instantly share code, notes, and snippets.

@achimnol achimnol/test.py
Last active Jan 10, 2018

Embed
What would you like to do?
async generator closing
import asyncio
import pytest
@pytest.mark.asyncio
async def test_close_asyncgen():
loop = asyncio.get_event_loop()
async def mygen():
print('mygen: 0-begin')
await asyncio.sleep(0.4)
print('mygen: 1')
yield 1
print('mygen: 2')
await asyncio.sleep(0.4)
print('mygen: 3')
yield 2
print('mygen: 4')
await asyncio.sleep(0.4)
print('mygen: 5-end')
g = mygen()
async def intr():
nonlocal g
await asyncio.sleep(0.6)
print('intr: aclosing')
await g.aclose() # expected: raise an error since generator is running
print('intr: aclosed')
t = loop.create_task(intr())
print('test: begin of async-for')
async for v in g:
print('test: generated', v)
print('test: end of async-for') # not reached???
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.