Skip to content

Instantly share code, notes, and snippets.

@kristjanvalur
Created April 24, 2023 09:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kristjanvalur/731bbf73a18e356eb2c203b69aa376a8 to your computer and use it in GitHub Desktop.
Save kristjanvalur/731bbf73a18e356eb2c203b69aa376a8 to your computer and use it in GitHub Desktop.
An asyncronous iterator which automatically closes the underlying iterator when done
#
# Python's `async for` only iterates and won't call `aclose()` on an Async Generator when done.
# Although `aclose()` is automatically scheduled to happen when the iterator goes out of scope, this will
# happen on a different Task.
# If iteration over an Async Generator is interrupted somehow, or left unfinished, that can be problematic
import contextlib
@contextlib.asynccontextmanager
async def aclosingiterator(iterable):
iter = iterable.__aiter__()
try:
yield iter
finally:
if hasattr(iter, "aclose"):
await iter.aclose()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment