Created
July 22, 2022 15:24
-
-
Save XoseLluis/bba7ecc00a6177c5641bf310fe5ad56b to your computer and use it in GitHub Desktop.
Run a python function with a timeout by means of multiprocessing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
import queue | |
import threading | |
from time import sleep | |
from typing import Any, Dict, TextIO, List, Callable, Tuple, Optional, AnyStr, Match, cast | |
class TimeoutException(Exception): | |
pass | |
def create_func_with_timeout(fn: Callable, timeout: int) -> Callable: | |
""" | |
returns a new function that runs the provided fn in a separate process that either finishes normally or times out | |
""" | |
def mp_function_wrapper(fn: Callable, queue: multiprocessing.Queue, *args, **kwargs): | |
""" | |
this function has been started in a separate process | |
it "returns" the result of fn to the main process by writing it to the queue | |
we could take fn and timeout as closure values, but it seems more clear to pass them as parameters | |
""" | |
try: | |
result = fn(*args, **kwargs) | |
except BaseException as ex: | |
result = ex | |
queue.put(result) | |
def func_with_timeout(*args, **kwargs): | |
""" | |
has the fn to run trapped as closure value | |
""" | |
try: | |
multiprocessing.set_start_method('fork') | |
# multiprocessing.set_start_method('spawn') | |
except RuntimeError as ex: | |
print(f"set_start_method: {ex}") | |
print(f"func_with_timeout {fn.__name__} {timeout}") | |
response_queue = multiprocessing.Queue() | |
extended_args = [fn, response_queue, *args] | |
proc = multiprocessing.Process(target=mp_function_wrapper, args=extended_args, kwargs=kwargs) | |
proc.start() | |
try: | |
result = response_queue.get(block=True, timeout=timeout) | |
except queue.Empty: | |
# the process is still running, so finish it off | |
proc.terminate() | |
raise TimeoutException() | |
if result and isinstance(result, BaseException): | |
raise result | |
return result | |
return func_with_timeout | |
def format_items(items): | |
#print(f"format_items") | |
results = [] | |
for item in items: | |
sleep(1) | |
results.append(item.upper()) | |
#print(item) | |
return ",".join(results) | |
format_with_timeout = create_func_with_timeout(format_items, 5) | |
def run_test(items): | |
print("- test") | |
try: | |
res = format_with_timeout(items) | |
print(f"result: {res}") | |
except BaseException as ex: | |
print(f"exception: {type(ex).__name__} {ex}") | |
print("- starting") | |
run_test(["a", "b", "c"]) | |
run_test(["a", "b", "c", "d", "e", "f", "g"]) | |
run_test(["a", None]) | |
print("- ending") | |
# - starting | |
# - test | |
# format_items | |
# result: A,B,C | |
# - test | |
# format_items | |
# exception: TimeoutException | |
# - test | |
# format_items | |
# exception: AttributeError 'NoneType' object has no attribute 'upper' | |
# - ending |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment