Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
Created October 25, 2017 15:49
Show Gist options
  • Save vxgmichel/445651b2d5b68426a348bad88ff15430 to your computer and use it in GitHub Desktop.
Save vxgmichel/445651b2d5b68426a348bad88ff15430 to your computer and use it in GitHub Desktop.
Isolate an asynchronous generator by running it in a background task
import asyncio
from itertools import count
from collections import AsyncIterable
async def clock(start=0, step=1, interval=1.):
for i in count(start, step):
yield i
await asyncio.sleep(interval)
class isolate(AsyncIterable):
def __init__(self, agen):
self._agen = agen
self._loop = asyncio.get_event_loop()
self._future = self._loop.create_future()
self._task = self._loop.create_task(self._target())
self._last = None
@property
def last_value(self):
return self._last
async def _target(self):
try:
async for item in self._agen:
self._set_result(item)
raise StopAsyncIteration
except Exception as exc:
self._set_exception(exc)
def _set_result(self, item):
self._future.set_result(item)
self._future = self._loop.create_future()
self._last = item
def _set_exception(self, exc):
self._future.set_exception(exc)
def __aiter__(self):
return self
def __anext__(self):
return self._future
async def test():
clk = isolate(clock())
await asyncio.sleep(1.5)
async for i in clk:
print(i)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment