Skip to content

Instantly share code, notes, and snippets.

@jiteshk23
Created March 4, 2020 14:59
Show Gist options
  • Save jiteshk23/88d36f64a2e9c546ad1051b2f15df7c2 to your computer and use it in GitHub Desktop.
Save jiteshk23/88d36f64a2e9c546ad1051b2f15df7c2 to your computer and use it in GitHub Desktop.
Decorator to run function in a seperate process and binding it to a core
import sys
import traceback
from functools import wraps
from multiprocessing import Process, Queue
def Processify(core):
'''Decorator to run a function as a process.
Be sure that every argument and the return value is *pickable*.
The created process is joined, so the code does not run in parallel.
Credits: https://gist.github.com/schlamar/2311116
'''
def decorator(func):
def process_func(q, *args, **kwargs):
try:
ret = func(*args, **kwargs)
except Exception:
ex_type, ex_value, tb = sys.exc_info()
error = ex_type, ex_value, ''.join(traceback.format_tb(tb))
ret = None
else:
error = None
q.put((ret, error))
# register original function with different name
# in sys.modules so it is pickable
process_func.__name__ = func.__name__ + 'processify_func'
setattr(sys.modules[__name__], process_func.__name__, process_func)
@wraps(func)
def wrapper(*args, **kwargs):
q = Queue()
p = Process(target=process_func, args=[q] + list(args), kwargs=kwargs)
p.start()
if core is not None:
os.system('/bin/taskset -cp {} {}'.format(core, p.pid))
try:
ret, error = q.get()
finally:
p.join()
if error:
ex_type, ex_value, tb_str = error
message = '%s (in subprocess)\n%s' % (ex_value.message, tb_str)
raise ex_type(message)
return ret
return wrapper
if callable(core):
core1, core = core, None
return decorator(core1)
else:
assert core is None or isinstance(core, int)
return decorator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment