Skip to content

Instantly share code, notes, and snippets.

@joezuntz
Last active October 14, 2023 13:36
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joezuntz/e7e7764e5b591ed519cfd488e20311f1 to your computer and use it in GitHub Desktop.
Save joezuntz/e7e7764e5b591ed519cfd488e20311f1 to your computer and use it in GitHub Desktop.
Python decorator to let you run a function that may kill/hang in a subprocess, isolating it.
import multiprocessing as mp
import os
import functools
from queue import Empty
class TimeoutError(Exception):
pass
def optionally_run_in_subprocess(f):
"""A decorator adding a kwarg to a function that makes it run in a subprocess.
This can be useful when you have a function that may segfault.
"""
# This functools.wraps command makes this whole thing work
# as a well-behaved decorator, maintaining the original function
# name and docstring.
@functools.wraps(f)
def wrapper(*args, **kwargs):
# Two extra kwargs, one public, are used for implementation.
# One indicates that the user wants to run in a subprocess, the other
# that the functtion is being called in the subprocess alreay.
# The latter is the queue where the result gets posted to.
run_in_subprocess = kwargs.pop('run_in_subprocess', False)
subprocess_timeout = kwargs.pop('subprocess_timeout', None)
queue = kwargs.pop('__queue', None)
# create the machinery python uses to fork a subprocess
# and run a function in it.
if run_in_subprocess:
q = mp.Queue()
p = mp.Process(target=wrapper, args=args, kwargs={"run_in_subprocess":False, "__queue":q, **kwargs})
# Because the use of this is avoiding crashes, rather than performance / parallelization
# we wait for the subproces result immediately.
p.start()
try:
result = q.get(timeout=subprocess_timeout)
p.join()
except Empty:
p.terminate()
raise TimeoutError("Function {} timed out with args: {}, {}".format(f, args, kwargs))
# Pass on any exception raised in the subprocess
if isinstance(result, BaseException):
raise result
return result
else:
# Run the function. Eiher we are in the subprocess already or the user
# does not want to run in the subproc.
try:
result = f(*args, **kwargs)
except BaseException as error:
# If running in standard mode just raise exceptions
# as normal
if queue is None:
raise
# Otherwise we pass the exception back to the caller
# in place of the result
result = error
# This is optional, so the function can still just be called
# normally with no effect.
if queue is not None:
queue.put(result)
return result
return wrapper
# Example
class Poly:
def __init__(self, a, b, c):
self.a = a
self.b = b
self.c = c
@optionally_run_in_subprocess
def __call__(self, x):
# we introduce a fatal flaw in this function
# that hangs is forever in some cases
if x>10:
while True:
pass
if x<0:
raise ValueError("Bad input.")
return self.a*x**2 + self.b*x + self.c
if __name__ == '__main__':
P = Poly(1,2,1)
x = 9
x = P(9, run_in_subprocess=False)
y = P(9, run_in_subprocess=True)
print(x)
print(y)
w = P(-1, run_in_subprocess=True)
try:
x = P(11, run_in_subprocess=True, subprocess_timeout=3.0)
except TimeoutError:
x = 0.0
print(x)
@optionally_run_in_subprocess
def segfaults():
import ctypes;ctypes.string_at(0)
try:
z = segfaults(run_in_subprocess=True, subprocess_timeout=3.0)
except TimeoutError:
print("Looks like the function 'segfaults' has some kind of problem. Weird.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment