Created
February 4, 2025 16:18
-
-
Save lucapericlp/24e4ad927f3728efacb89413cd71123d to your computer and use it in GitHub Desktop.
In an async context, run a sync generator inside a threadpool to utilise the iterative nature of the generator without blocking the event loop
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from concurrent.futures import ThreadPoolExecutor | |
from typing import AsyncIterator, Iterator | |
import queue | |
def compute_intensive_generator() -> Iterator[int]: | |
"""Example compute-bound generator that yields numbers.""" | |
result = 0 | |
for i in range(10): | |
# Simulate heavy computation | |
for _ in range(10_000_000): | |
result += 1 | |
yield result | |
async def run_generator_in_thread(generator_func) -> AsyncIterator[int]: | |
""" | |
Runs a generator function in a ThreadPoolExecutor and yields results | |
asynchronously. | |
""" | |
thread_queue = queue.Queue(maxsize=1) | |
def generator_to_queue(gen: Iterator) -> None: | |
"""Puts generator items into a queue.""" | |
try: | |
for item in gen: | |
thread_queue.put((False, item)) | |
except Exception as e: | |
thread_queue.put((True, e)) | |
else: | |
thread_queue.put((True, None)) | |
# async queue for the async consumer | |
async_queue = asyncio.Queue(maxsize=1) | |
# bridge between thread_queue and async_queue :'( | |
async def queue_bridge(): | |
while True: | |
# Run the blocking queue.get in a thread pool | |
item = await asyncio.to_thread(thread_queue.get) | |
await async_queue.put(item) | |
if item[0]: # is_done flag | |
break | |
with ThreadPoolExecutor(max_workers=1) as executor: | |
executor.submit(generator_to_queue, generator_func()) | |
bridge_task = asyncio.create_task(queue_bridge()) | |
try: | |
while True: | |
is_done, item = await async_queue.get() | |
if is_done: | |
if isinstance(item, Exception): | |
raise item | |
break | |
yield item | |
finally: | |
await bridge_task | |
async def process_data(): | |
async for value in run_generator_in_thread(compute_intensive_generator): | |
print(f"Received value: {value}") | |
if __name__ == "__main__": | |
asyncio.run(process_data()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment