Skip to content

Instantly share code, notes, and snippets.

@Olegt0rr
Last active July 28, 2023 11:31
Show Gist options
  • Save Olegt0rr/d4bd881f2f3fdacede426b1368139582 to your computer and use it in GitHub Desktop.
Save Olegt0rr/d4bd881f2f3fdacede426b1368139582 to your computer and use it in GitHub Desktop.
Such a solution may be needed for the effective use of third-party asynchronous iterators, which imply a step-by-step return of one element, and under the hood make a request for several elements ahead. E.g. `.find()` method of motor mongo client.
from __future__ import annotations
from typing import TYPE_CHECKING, TypeVar
if TYPE_CHECKING:
from collections.abc import AsyncIterator
T = TypeVar("T")
async def async_chunks(
async_iterator: AsyncIterator[T],
size: int,
) -> AsyncIterator[list[T]]:
"""Generate chunks from an asynchronous sequence.
Chunks are lists consists of original ``T`` elements.
The chunk can't contain more than ``size`` elements.
The last chunk might contain less than ``size`` elements,
but can't be empty.
"""
finished = False
while not finished:
results: list[T] = []
for _ in range(size):
try:
# for python3.10+ `anext(async_iterator)` is available,
# but for lower versions compatibility let's use this:
result = await async_iterator.__anext__()
except StopAsyncIteration:
finished = True
else:
results.append(result)
if results:
yield results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment