Skip to content

Instantly share code, notes, and snippets.

@jab
Created September 21, 2022 02:16
Show Gist options
  • Save jab/5fe3666746612fa29b54b240e419adb8 to your computer and use it in GitHub Desktop.
Save jab/5fe3666746612fa29b54b240e419adb8 to your computer and use it in GitHub Desktop.
aiter() / anext() await-free Python implementations
"""
❯ python3 ait.py
3
2
1
blastoff aiter!
3
2
1
blastoff anext!
"""
def aiter_(aiterable):
return aiterable.__aiter__()
_NOT_PROVIDED = object()
def anext_(aiterator, default=_NOT_PROVIDED):
if default is _NOT_PROVIDED:
return aiterator.__anext__()
return _anext_with_default(aiterator.__anext__(), default)
class _anext_with_default:
__slots__ = ('iterator', 'default')
def __init__(self, iterator, default):
self.iterator = iterator
self.default = default
def __await__(self):
return self
def __next__(self):
try:
return next(self.iterator)
except StopAsyncIteration:
raise StopIteration(self.default) from None
async def countdown(n=3):
from asyncio import sleep
while n:
await sleep(0.2)
yield n
n -= 1
async def amain():
async for i in aiter_(countdown()):
print(i)
print("blastoff aiter!")
ait = aiter_(countdown())
DONE = object()
while True:
nxt = await anext_(ait, DONE)
if nxt is not DONE:
print(nxt)
else:
print("blastoff anext!")
break
from asyncio import run
run(amain())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment