Skip to content

Instantly share code, notes, and snippets.

@schlamar
Last active April 17, 2024 19:19
Show Gist options
  • Save schlamar/2311116 to your computer and use it in GitHub Desktop.
Save schlamar/2311116 to your computer and use it in GitHub Desktop.
processify
import os
import sys
import traceback
from functools import wraps
from multiprocessing import Process, Queue
def processify(func):
'''Decorator to run a function as a process.
Be sure that every argument and the return value
is *pickable*.
The created process is joined, so the code does not
run in parallel.
'''
def process_func(q, *args, **kwargs):
try:
ret = func(*args, **kwargs)
except Exception:
ex_type, ex_value, tb = sys.exc_info()
error = ex_type, ex_value, ''.join(traceback.format_tb(tb))
ret = None
else:
error = None
q.put((ret, error))
# register original function with different name
# in sys.modules so it is pickable
process_func.__name__ = func.__name__ + 'processify_func'
setattr(sys.modules[__name__], process_func.__name__, process_func)
@wraps(func)
def wrapper(*args, **kwargs):
q = Queue()
p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs)
p.start()
ret, error = q.get()
p.join()
if error:
ex_type, ex_value, tb_str = error
message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str)
raise ex_type(message)
return ret
return wrapper
@processify
def test_function():
return os.getpid()
@processify
def test_deadlock():
return range(30000)
@processify
def test_exception():
raise RuntimeError('xyz')
def test():
print os.getpid()
print test_function()
print len(test_deadlock())
test_exception()
if __name__ == '__main__':
test()
@bergkvist
Copy link

bergkvist commented Oct 19, 2021

Personally, I've made this, which doesn't require that things can be pickled:

import multiprocessing as mp

def forked(fn):
    """
    Does not work on Windows (except WSL2), since the fork syscall is not supported here.
    fork creates a new process which inherits all of the memory without it being copied.
    Memory is copied on write instead, meaning it is very cheap to create a new process
    """
    def call(*args, **kwargs):
        ctx = mp.get_context('fork')
        q = ctx.Queue(1)
        is_error = ctx.Value('b', False)
        def target():
            try:
                q.put(fn(*args, **kwargs))
            except BaseException as e:
                is_error.value = True
                q.put(e)
        ctx.Process(target=target).start()
        result = q.get()
        if is_error.value:
            raise result
        return result
    return call


# You can use forked as a decorator:
@forked
def add(x, y):
    return x + y

@gabrc52
Copy link

gabrc52 commented Mar 30, 2023

https://stackoverflow.com/a/72490867/5031798 also works, without needing to do all the lower level forking ourselves, but I'm not sure how to turn it into a decorator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment