Skip to content

Instantly share code, notes, and snippets.

@lucapericlp
Created February 4, 2025 16:18
Show Gist options
  • Save lucapericlp/24e4ad927f3728efacb89413cd71123d to your computer and use it in GitHub Desktop.
Save lucapericlp/24e4ad927f3728efacb89413cd71123d to your computer and use it in GitHub Desktop.
In an async context, run a sync generator inside a threadpool to utilise the iterative nature of the generator without blocking the event loop
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import AsyncIterator, Iterator
import queue
def compute_intensive_generator() -> Iterator[int]:
"""Example compute-bound generator that yields numbers."""
result = 0
for i in range(10):
# Simulate heavy computation
for _ in range(10_000_000):
result += 1
yield result
async def run_generator_in_thread(generator_func) -> AsyncIterator[int]:
"""
Runs a generator function in a ThreadPoolExecutor and yields results
asynchronously.
"""
thread_queue = queue.Queue(maxsize=1)
def generator_to_queue(gen: Iterator) -> None:
"""Puts generator items into a queue."""
try:
for item in gen:
thread_queue.put((False, item))
except Exception as e:
thread_queue.put((True, e))
else:
thread_queue.put((True, None))
# async queue for the async consumer
async_queue = asyncio.Queue(maxsize=1)
# bridge between thread_queue and async_queue :'(
async def queue_bridge():
while True:
# Run the blocking queue.get in a thread pool
item = await asyncio.to_thread(thread_queue.get)
await async_queue.put(item)
if item[0]: # is_done flag
break
with ThreadPoolExecutor(max_workers=1) as executor:
executor.submit(generator_to_queue, generator_func())
bridge_task = asyncio.create_task(queue_bridge())
try:
while True:
is_done, item = await async_queue.get()
if is_done:
if isinstance(item, Exception):
raise item
break
yield item
finally:
await bridge_task
async def process_data():
async for value in run_generator_in_thread(compute_intensive_generator):
print(f"Received value: {value}")
if __name__ == "__main__":
asyncio.run(process_data())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment