Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save austospumanto/694723b9a4d5c1ca843b88e3460765ec to your computer and use it in GitHub Desktop.
Save austospumanto/694723b9a4d5c1ca843b88e3460765ec to your computer and use it in GitHub Desktop.
Executing CPU-intensive workloads via multiprocessing.Process sub-processes
import multiprocessing
import pickle
import struct
from typing import Optional, Callable, List, Any
from tqdm import tqdm
import pickle
import struct
import warnings
from multiprocessing.connection import Connection
from typing import Optional
def exec_proc(fn, child_conn: Optional[Connection] = None):
warnings.catch_warnings(record=False)
try:
res = fn()
if child_conn is not None:
pickled_bytes: bytes = pickle.dumps(res, protocol=pickle.HIGHEST_PROTOCOL)
length_as_bytes: bytes = struct.pack("<Q", len(pickled_bytes))
# noinspection PyUnresolvedReferences
child_conn._send(length_as_bytes + pickled_bytes)
else:
return res
except Exception as e:
print(f"Exception when executing function in subprocess. Error: {repr(e)}")
raise e
def execute_concurrently_without_pool(
*fns: Callable, desc: Optional[str] = None
) -> List[Any]:
results = [None] * len(fns)
tq = tqdm(total=len(fns), desc=desc)
procs = [] # : List[multiprocessing.Process]
parent_conns = [] # s: List[Connection]
child_conns = [] # s: List[Connection]
for fn in fns:
parent_conn, child_conn = multiprocessing.Pipe(duplex=False)
proc = multiprocessing.Process(target=exec_proc, args=(fn, child_conn))
proc.start()
procs.append(proc)
parent_conns.append(parent_conn)
child_conns.append(child_conn)
unfinished_proc_idxs = set(range(len(procs))) # : Set[int]
while unfinished_proc_idxs:
for proc_idx in list(unfinished_proc_idxs):
parent_conn = parent_conns[proc_idx]
if parent_conn.poll(0.01):
# noinspection PyUnresolvedReferences
binary_length = parent_conn._recv(8).getvalue() # : bytes
length = struct.unpack("<Q", binary_length)[0] # : int
# noinspection PyUnresolvedReferences
pickled_bytes = parent_conn._recv(length).getvalue()
results[proc_idx] = pickle.loads(pickled_bytes)
procs[proc_idx].join(timeout=None)
unfinished_proc_idxs.discard(proc_idx)
parent_conn.close()
child_conns[proc_idx].close()
tq.update(1)
tq.close()
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment