Skip to content

Instantly share code, notes, and snippets.

@XoseLluis
Created July 22, 2022 15:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save XoseLluis/bba7ecc00a6177c5641bf310fe5ad56b to your computer and use it in GitHub Desktop.
Save XoseLluis/bba7ecc00a6177c5641bf310fe5ad56b to your computer and use it in GitHub Desktop.
Run a python function with a timeout by means of multiprocessing
import multiprocessing
import queue
import threading
from time import sleep
from typing import Any, Dict, TextIO, List, Callable, Tuple, Optional, AnyStr, Match, cast
class TimeoutException(Exception):
pass
def create_func_with_timeout(fn: Callable, timeout: int) -> Callable:
"""
returns a new function that runs the provided fn in a separate process that either finishes normally or times out
"""
def mp_function_wrapper(fn: Callable, queue: multiprocessing.Queue, *args, **kwargs):
"""
this function has been started in a separate process
it "returns" the result of fn to the main process by writing it to the queue
we could take fn and timeout as closure values, but it seems more clear to pass them as parameters
"""
try:
result = fn(*args, **kwargs)
except BaseException as ex:
result = ex
queue.put(result)
def func_with_timeout(*args, **kwargs):
"""
has the fn to run trapped as closure value
"""
try:
multiprocessing.set_start_method('fork')
# multiprocessing.set_start_method('spawn')
except RuntimeError as ex:
print(f"set_start_method: {ex}")
print(f"func_with_timeout {fn.__name__} {timeout}")
response_queue = multiprocessing.Queue()
extended_args = [fn, response_queue, *args]
proc = multiprocessing.Process(target=mp_function_wrapper, args=extended_args, kwargs=kwargs)
proc.start()
try:
result = response_queue.get(block=True, timeout=timeout)
except queue.Empty:
# the process is still running, so finish it off
proc.terminate()
raise TimeoutException()
if result and isinstance(result, BaseException):
raise result
return result
return func_with_timeout
def format_items(items):
#print(f"format_items")
results = []
for item in items:
sleep(1)
results.append(item.upper())
#print(item)
return ",".join(results)
format_with_timeout = create_func_with_timeout(format_items, 5)
def run_test(items):
print("- test")
try:
res = format_with_timeout(items)
print(f"result: {res}")
except BaseException as ex:
print(f"exception: {type(ex).__name__} {ex}")
print("- starting")
run_test(["a", "b", "c"])
run_test(["a", "b", "c", "d", "e", "f", "g"])
run_test(["a", None])
print("- ending")
# - starting
# - test
# format_items
# result: A,B,C
# - test
# format_items
# exception: TimeoutException
# - test
# format_items
# exception: AttributeError 'NoneType' object has no attribute 'upper'
# - ending
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment