Created
September 16, 2024 14:21
-
-
Save ZmeiGorynych/c10715ef92fefd506e4683e2edb57a5d to your computer and use it in GitHub Desktop.
A threading approach to concurrently calling IO-blocked functions such as LLMs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Callable, Iterable | |
import logging | |
import concurrent.futures | |
logger = logging.getLogger(__name__) | |
def process_batch_parallel( | |
function: Callable, | |
batched_args: Iterable, | |
max_workers: int, | |
) -> list: | |
results = [] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_batch = { | |
executor.submit(function, *args): args for args in batched_args | |
} | |
for future in concurrent.futures.as_completed(future_to_batch): | |
args = future_to_batch[future] | |
arg_str = ",".join(map(str, args)) | |
try: | |
logger.info(f"Running a task in parallel {arg_str}") | |
data = future.result() | |
results.append((data, future_to_batch[future])) | |
except Exception as ex: | |
logger.error(f"Error in running task '{arg_str}': {ex}") | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment