Skip to content

Instantly share code, notes, and snippets.

@smurfix
Last active January 9, 2024 09:31
Show Gist options
  • Save smurfix/d528056259496d1b7ada1c20b9aee6fb to your computer and use it in GitHub Desktop.
Save smurfix/d528056259496d1b7ada1c20b9aee6fb to your computer and use it in GitHub Desktop.
anyio ResultGatheringTaskgroup
import anyio
from contextlib import asynccontextmanager
class NotYet(RuntimeError):
pass
class ResultGatheringTaskgroup:
def __init__(self):
self.result = []
async def __aenter__(self):
self._taskgroup = tg = anyio.create_task_group()
await tg.__aenter__()
return self
async def __aexit__(self, *tb):
try:
res = await self._taskgroup.__aexit__(*tb)
return res
finally:
del self._taskgroup
async def _run_one(self, pos, proc, a):
self.result[pos] = await proc(*a)
async def spawn(self, proc, *a):
pos = len(self.result)
self.result.append(NotYet)
await self._taskgroup.spawn(self._run_one, pos, proc, a)
return pos
def get_result(self, pos):
res = self.result[pos]
if res is NotYet:
raise NotYet()
return res
if __name__ == "__main__":
async def test():
async def s(x):
await anyio.sleep(x)
return 3*x
async with ResultGatheringTaskgroup() as rg:
a = await rg.spawn(s,0.125)
b = await rg.spawn(s,0.25)
assert rg.get_result(a) == 0.375
assert rg.get_result(b) == 0.75
assert rg.result == [0.375,0.75]
anyio.run(test)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment