Skip to content

Instantly share code, notes, and snippets.

@schlamar
Last active April 17, 2024 19:19
Show Gist options
  • Save schlamar/2311116 to your computer and use it in GitHub Desktop.
Save schlamar/2311116 to your computer and use it in GitHub Desktop.
processify
import os
import sys
import traceback
from functools import wraps
from multiprocessing import Process, Queue
def processify(func):
'''Decorator to run a function as a process.
Be sure that every argument and the return value
is *pickable*.
The created process is joined, so the code does not
run in parallel.
'''
def process_func(q, *args, **kwargs):
try:
ret = func(*args, **kwargs)
except Exception:
ex_type, ex_value, tb = sys.exc_info()
error = ex_type, ex_value, ''.join(traceback.format_tb(tb))
ret = None
else:
error = None
q.put((ret, error))
# register original function with different name
# in sys.modules so it is pickable
process_func.__name__ = func.__name__ + 'processify_func'
setattr(sys.modules[__name__], process_func.__name__, process_func)
@wraps(func)
def wrapper(*args, **kwargs):
q = Queue()
p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs)
p.start()
ret, error = q.get()
p.join()
if error:
ex_type, ex_value, tb_str = error
message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str)
raise ex_type(message)
return ret
return wrapper
@processify
def test_function():
return os.getpid()
@processify
def test_deadlock():
return range(30000)
@processify
def test_exception():
raise RuntimeError('xyz')
def test():
print os.getpid()
print test_function()
print len(test_deadlock())
test_exception()
if __name__ == '__main__':
test()
@gmossessian
Copy link

gmossessian commented Apr 13, 2021

Sadly, this seems to break with an Apple M1 chip :( If you run it with multiprocessing.set_start_method('fork') in many situations it simply crashes, and I have not been able to figure out why. If you try with 'spawn' or 'forkserver', the following happens (because the sys.modules trick does not work in those instances, as the subprocess does not copy over sys.modules which is a CPython-level object):

Running @Chiron1991's py3 version with Python 3.8.2 under Rosetta:

(venv) george@MacBook-Pro autocomplete % python processify.py 
29242
Traceback (most recent call last):
  File "processify.py", line 80, in <module>
    test()
  File "processify.py", line 74, in test
    print(test_function())
  File "processify.py", line 43, in wrapper
    p.start()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/context.py", line 283, in _Popen
    return Popen(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'processify.<locals>.process_func'

I've been trying to find a fix but so far no luck.

@dgerosa
Copy link

dgerosa commented Jul 8, 2021

Here I added the init and setup files to make it a module: https://github.com/dgerosa/processify

@bergkvist
Copy link

bergkvist commented Oct 19, 2021

Personally, I've made this, which doesn't require that things can be pickled:

import multiprocessing as mp

def forked(fn):
    """
    Does not work on Windows (except WSL2), since the fork syscall is not supported here.
    fork creates a new process which inherits all of the memory without it being copied.
    Memory is copied on write instead, meaning it is very cheap to create a new process
    """
    def call(*args, **kwargs):
        ctx = mp.get_context('fork')
        q = ctx.Queue(1)
        is_error = ctx.Value('b', False)
        def target():
            try:
                q.put(fn(*args, **kwargs))
            except BaseException as e:
                is_error.value = True
                q.put(e)
        ctx.Process(target=target).start()
        result = q.get()
        if is_error.value:
            raise result
        return result
    return call


# You can use forked as a decorator:
@forked
def add(x, y):
    return x + y

@gabrc52
Copy link

gabrc52 commented Mar 30, 2023

https://stackoverflow.com/a/72490867/5031798 also works, without needing to do all the lower level forking ourselves, but I'm not sure how to turn it into a decorator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment