Skip to content

Instantly share code, notes, and snippets.

@guyskk
Created March 10, 2018 06:54
Show Gist options
  • Save guyskk/2b307db29684f1eef95848e68cc3aa6a to your computer and use it in GitHub Desktop.
Save guyskk/2b307db29684f1eef95848e68cc3aa6a to your computer and use it in GitHub Desktop.
A idea of using contextvars to solve generator finalize problem
"""
contextvars is added in unreleased cpython, I build it from master branch
Python 3.8.0a0 (heads/master:3b20d345, Mar 10 2018, 13:23:11).
Also modify 2 lines in curio/kernel.py:
kernel.py Line 327:
> task._ctx = contextvars.copy_context()
kernel.py Line 827:
> trap = current._ctx.run(current._send, current.next_value)
> # trap = current._send(current.next_value)
Here is the result:
$ ./python deferable.py
g1 0
g1 1
g1 2
g1 3
g1 4
g1 5
close in test1 [<generator object g1 at 0x7fb985d34988>]
g1 closed
g2 0
g2 0
g2 1
g2 1
g2 2
g2 2
g2 3
g2 3
g2 4
g2 4
g2 5
g2 5
close in test2 []
aclose in test2 [<async_generator object g2 at 0x7fb984c496d0>]
g2 closed
close in test2 []
aclose in test2 [<async_generator object g2 at 0x7fb984c499b0>]
g2 closed
close in main []
aclose in main []
"""
import inspect
import contextvars
import curio
__defers__ = contextvars.ContextVar('__defers__')
__async_defers__ = contextvars.ContextVar('__async_defers__')
def defer(func):
"""register generator"""
if inspect.isgeneratorfunction(func):
def wrapped(*args, **kwargs):
gen = func(*args, **kwargs)
__defers__.get().append(gen)
for x in gen:
yield x
elif inspect.isasyncgenfunction(func):
async def wrapped(*args, **kwargs):
gen = func(*args, **kwargs)
__async_defers__.get().append(gen)
async for x in gen:
yield x
else:
return func
return wrapped
def deferable(func):
"""finalize registered generators"""
if inspect.iscoroutinefunction(func):
async def wrapped(*args, **kwargs):
local_defers = []
local_async_defers = []
token = __defers__.set(local_defers)
token_async = __async_defers__.set(local_async_defers)
try:
return await func(*args, **kwargs)
finally:
__defers__.reset(token)
__async_defers__.reset(token_async)
print('close in', func.__name__, local_defers)
for gen in local_defers:
gen.close()
print('aclose in', func.__name__, local_async_defers)
for gen in local_async_defers:
await gen.aclose()
else:
def wrapped(*args, **kwargs):
local_defers = []
token = __defers__.set(local_defers)
try:
return func(*args, **kwargs)
finally:
__defers__.reset(token)
print('close in', func.__name__, local_defers)
for gen in local_defers:
gen.close()
return wrapped
@defer
def g1(n):
try:
for i in range(n):
yield 'g1', i
finally:
print('g1 closed')
@defer
async def g2(n):
try:
for i in range(n):
yield 'g2', i
finally:
print('g2 closed')
@deferable
def test1():
for a, i in g1(10):
print(a, i)
if i == 5:
break
@deferable
async def test2():
await curio.sleep(0.01)
async for a, i in g2(10):
print(a, i)
await curio.sleep(0.01)
if i == 5:
break
@deferable
async def main():
test1()
task2 = await curio.spawn(test2)
task3 = await curio.spawn(test2)
await task2.join()
await task3.join()
if __name__ == '__main__':
curio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment