Skip to content

Instantly share code, notes, and snippets.

Forked from schlamar/
Last active July 12, 2024 08:32
Show Gist options
  • Save stuaxo/889db016e51264581b50 to your computer and use it in GitHub Desktop.
Save stuaxo/889db016e51264581b50 to your computer and use it in GitHub Desktop.
# modified from
# also see
import inspect
import os
import sys
import traceback
from functools import wraps
from multiprocessing import Process, Queue
class Sentinel:
def processify(func):
'''Decorator to run a function as a process.
Be sure that every argument and the return value
is *pickable*.
The created process is joined, so the code does not
run in parallel.
def process_generator_func(q, *args, **kwargs):
result = None
error = None
it = iter(func())
while error is None and result != Sentinel:
result = next(it)
error = None
except StopIteration:
result = Sentinel
error = None
except Exception:
ex_type, ex_value, tb = sys.exc_info()
error = ex_type, ex_value, ''.join(traceback.format_tb(tb))
result = None
q.put((result, error))
def process_func(q, *args, **kwargs):
result = func(*args, **kwargs)
except Exception:
ex_type, ex_value, tb = sys.exc_info()
error = ex_type, ex_value, ''.join(traceback.format_tb(tb))
result = None
error = None
q.put((result, error))
def wrap_func(*args, **kwargs):
# register original function with different name
# in sys.modules so it is pickable
process_func.__name__ = func.__name__ + 'processify_func'
setattr(sys.modules[__name__], process_func.__name__, process_func)
q = Queue()
p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs)
result, error = q.get()
if error:
ex_type, ex_value, tb_str = error
message = '%s (in subprocess)\n%s' % (str(ex_value), tb_str)
raise ex_type(message)
return result
def wrap_generator_func(*args, **kwargs):
# register original function with different name
# in sys.modules so it is pickable
process_generator_func.__name__ = func.__name__ + 'processify_generator_func'
setattr(sys.modules[__name__], process_generator_func.__name__, process_generator_func)
q = Queue()
p = Process(target=process_generator_func, args=[q] + list(args), kwargs=kwargs)
result = None
error = None
while error is None:
result, error = q.get()
if result == Sentinel:
yield result
if error:
ex_type, ex_value, tb_str = error
message = '%s (in subprocess)\n%s' % (str(ex_value), tb_str)
raise ex_type(message)
def wrapper(*args, **kwargs):
if inspect.isgeneratorfunction(func):
return wrap_generator_func(*args, **kwargs)
return wrap_func(*args, **kwargs)
return wrapper
def test_function():
return os.getpid()
def test_generator_func():
for msg in ["generator", "function"]:
yield msg
def test_deadlock():
return range(30000)
def test_exception():
raise RuntimeError('xyz')
def test():
if __name__ == '__main__':
Copy link

Thanks for this - proved very useful for hacking around a CUDA memory leak that slowly accrued over many iterations of my code. Launching the code in the subprocess clears the memory when it terminates. Now I should probably figure out what is causing the leak ...

Copy link

stuaxo commented Jun 12, 2020

np, it really just built on the original by @schlamar

This should probably be inside a library, I haven't had to use it since I modified it in 2016, but it was handy at the time for providing isolation between my new code and a whole load of knarly old code that I didn't fully trust to not do something weird and break my own.

It does feel like this should be in a library, if that hasn't happened already.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment