Skip to content

Instantly share code, notes, and snippets.

@ZmeiGorynych
Created September 16, 2024 14:21
Show Gist options
  • Save ZmeiGorynych/c10715ef92fefd506e4683e2edb57a5d to your computer and use it in GitHub Desktop.
Save ZmeiGorynych/c10715ef92fefd506e4683e2edb57a5d to your computer and use it in GitHub Desktop.
A threading approach to concurrently calling IO-blocked functions such as LLMs
from typing import Callable, Iterable
import logging
import concurrent.futures
logger = logging.getLogger(__name__)
def process_batch_parallel(
function: Callable,
batched_args: Iterable,
max_workers: int,
) -> list:
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_batch = {
executor.submit(function, *args): args for args in batched_args
}
for future in concurrent.futures.as_completed(future_to_batch):
args = future_to_batch[future]
arg_str = ",".join(map(str, args))
try:
logger.info(f"Running a task in parallel {arg_str}")
data = future.result()
results.append((data, future_to_batch[future]))
except Exception as ex:
logger.error(f"Error in running task '{arg_str}': {ex}")
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment