Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
processify
import os
import sys
import traceback
from functools import wraps
from multiprocessing import Process, Queue
def processify(func):
'''Decorator to run a function as a process.
Be sure that every argument and the return value
is *pickable*.
The created process is joined, so the code does not
run in parallel.
'''
def process_func(q, *args, **kwargs):
try:
ret = func(*args, **kwargs)
except Exception:
ex_type, ex_value, tb = sys.exc_info()
error = ex_type, ex_value, ''.join(traceback.format_tb(tb))
ret = None
else:
error = None
q.put((ret, error))
# register original function with different name
# in sys.modules so it is pickable
process_func.__name__ = func.__name__ + 'processify_func'
setattr(sys.modules[__name__], process_func.__name__, process_func)
@wraps(func)
def wrapper(*args, **kwargs):
q = Queue()
p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs)
p.start()
ret, error = q.get()
p.join()
if error:
ex_type, ex_value, tb_str = error
message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str)
raise ex_type(message)
return ret
return wrapper
@processify
def test_function():
return os.getpid()
@processify
def test_deadlock():
return range(30000)
@processify
def test_exception():
raise RuntimeError('xyz')
def test():
print os.getpid()
print test_function()
print len(test_deadlock())
test_exception()
if __name__ == '__main__':
test()
@hangtwenty

This comment has been minimized.

Copy link

@hangtwenty hangtwenty commented Aug 21, 2014

so helpful! thank you!

@YS-L

This comment has been minimized.

Copy link

@YS-L YS-L commented Oct 1, 2015

Nice utility, thanks!

It seems that if the decorated function returns a sufficiently large object, a deadlock can occur at the p.join() line, for example:

@processify
def will_deadlock()
    return range(30000)


if __name__ == '__main__':
    will_deadlock() # deadlocks here

Simply removing the p.join() line solves the problem. The multiprocessing doc mentioned a possible deadlock issue regarding joining processes that use queues, which seems to be relevant here.

@dgerosa

This comment has been minimized.

Copy link

@dgerosa dgerosa commented May 11, 2017

Hello,

this is incredibly useful! Thanks! You should really upload it to Pypi...

Davide

@karatheodory

This comment has been minimized.

Copy link

@karatheodory karatheodory commented May 29, 2017

Thank you for the great work, it is very helpful!

One problem I've had with it is when the original exception type has more parameters than usual (in my case it is sqlalchemy.exc.ProgrammingError, which takes 4 arguments, so I've had TypeError: __init__() takes at least 4 arguments (2 given) instead of the original exception). We can go this way:

        if error:
            ex_type, ex_value, tb_str = error
            message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str)
            try:
                exception = ex_type(message)
            except Exception:
                # Failed to keep the original exception type
                exception = Exception('%s\n(original exception type: %s)' % (message, ex_type))
            raise exception
@samoand

This comment has been minimized.

Copy link

@samoand samoand commented Aug 26, 2017

Can you please add a license to this code? I was putting together something like that, with splitting input and mapping it to pool of processes. It would be very helpful to use your code as a base. Thanks.

@dentonzh

This comment has been minimized.

Copy link

@dentonzh dentonzh commented Jan 30, 2019

Thank you! Solved an issue I've been trying to resolve for the past ~4 hours or so in which my server kills a long-running Python script I was running. Wrapped the offending code in processify and it's taken my CPU / Mem usage down significantly (by a factor of 20-30).

@pranav08

This comment has been minimized.

Copy link

@pranav08 pranav08 commented May 30, 2019

Sorry I am pasting error here, I tried to spawn a process inside the flask route, I also removed join() from processify.py:

E00073.274: Exception escaped from start_client

        Traceback (most recent call last):
          File "/home/pranav/.vscode/extensions/ms-python.python-2019.5.17059/pythonFiles/lib/python/ptvsd/log.py", line 110, in g
            return f(*args, **kwargs)
          File "/home/pranav/.vscode/extensions/ms-python.python-2019.5.17059/pythonFiles/lib/python/ptvsd/pydevd_hooks.py", line 74, in start_client
            sock, start_session = daemon.start_client((host, port))
          File "/home/pranav/.vscode/extensions/ms-python.python-2019.5.17059/pythonFiles/lib/python/ptvsd/daemon.py", line 214, in start_client
            with self.started():
          File "/usr/lib/python3.6/contextlib.py", line 81, in __enter__
            return next(self.gen)
          File "/home/pranav/.vscode/extensions/ms-python.python-2019.5.17059/pythonFiles/lib/python/ptvsd/daemon.py", line 110, in started
            self.start()
          File "/home/pranav/.vscode/extensions/ms-python.python-2019.5.17059/pythonFiles/lib/python/ptvsd/daemon.py", line 145, in start
            raise RuntimeError('already started')
        RuntimeError: already started
@Dilpreet7365123

This comment has been minimized.

Copy link

@Dilpreet7365123 Dilpreet7365123 commented Nov 1, 2019

I am getting a deadlock while using it.

@volodyaordynsky

This comment has been minimized.

Copy link

@volodyaordynsky volodyaordynsky commented Dec 3, 2019

Great module, very helpful, runs like a magic in my scripts with python 2.7.15. With python 3.7.5 I get following:

Traceback (most recent call last): File "<string>", line 1, in <module> File "C:\ProgramData\Anaconda2\envs\pyt3.7\lib\multiprocessing\spawn.py", line 105, in spawn_main exitcode = _main(fd) File "C:\ProgramData\Anaconda2\envs\pyt3.7\lib\multiprocessing\spawn.py", line 115, in _main self = reduction.pickle.load(from_parent) EOFError: Ran out of input

Any idea/suggestion what can be wrong here?

@Chiron1991

This comment has been minimized.

Copy link

@Chiron1991 Chiron1991 commented Dec 18, 2020

@gmossessian

This comment has been minimized.

Copy link

@gmossessian gmossessian commented Apr 13, 2021

Sadly, this seems to break with an Apple M1 chip :( If you run it with multiprocessing.set_start_method('fork') in many situations it simply crashes, and I have not been able to figure out why. If you try with 'spawn' or 'forkserver', the following happens (because the sys.modules trick does not work in those instances, as the subprocess does not copy over sys.modules which is a CPython-level object):

Running @Chiron1991's py3 version with Python 3.8.2 under Rosetta:

(venv) george@MacBook-Pro autocomplete % python processify.py 
29242
Traceback (most recent call last):
  File "processify.py", line 80, in <module>
    test()
  File "processify.py", line 74, in test
    print(test_function())
  File "processify.py", line 43, in wrapper
    p.start()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/context.py", line 283, in _Popen
    return Popen(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'processify.<locals>.process_func'

I've been trying to find a fix but so far no luck.

@dgerosa

This comment has been minimized.

Copy link

@dgerosa dgerosa commented Jul 8, 2021

Here I added the init and setup files to make it a module: https://github.com/dgerosa/processify

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment