Last active
November 2, 2020 12:00
-
-
Save tell/afa5312f55e4f9db070a1a55e820d87a to your computer and use it in GitHub Desktop.
Python multiprocessing with timeout
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/.idea/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing as mp | |
import pathlib | |
import random | |
import resource | |
import sys | |
import time | |
import traceback | |
from typing import Union | |
class CannotSleepAtAll(RuntimeError): | |
pass | |
class Sleeper(mp.Process): | |
def __init__(self, max_sleep_second: int, out_port: Union[mp.Queue, mp.SimpleQueue]): | |
super().__init__() | |
self.max_sleep_second = max_sleep_second | |
self.out_port = out_port | |
def _sleeping(self): | |
cproc = mp.current_process() | |
mss = self.max_sleep_second | |
succ = random.randint(1, 50) | |
if succ != 1: | |
n = random.randint(0, mss) | |
time.sleep(n) | |
return 'finish', {'name': cproc.name, 'pid': cproc.pid, 'second': n} | |
else: | |
raise CannotSleepAtAll(f'{cproc.name}') | |
def run(self): | |
out_port = self.out_port | |
try: | |
cond, result = self._sleeping() | |
except: | |
raise | |
else: | |
out_port.put(result) | |
out_port.put(None) | |
def main(num_procs: int, max_sleep_second: int = 20): | |
# Setup. | |
procs = [] | |
queues = [] | |
for i in range(num_procs): | |
queue = mp.SimpleQueue() | |
proc = Sleeper(max_sleep_second, queue) | |
procs.append(proc) | |
queues.append(queue) | |
# Execute. | |
for i, proc in enumerate(procs): | |
while True: | |
try: | |
proc.start() | |
except (BlockingIOError, OSError): | |
print(f'{traceback.format_exc()}') | |
print(f'number of fd in proc dir: {num_of_fd()}') | |
print(f'retry "Process.start()": {i}') | |
print() | |
sys.stdout.flush() | |
time.sleep(1) | |
else: | |
break | |
# Wait. | |
time.sleep(3) | |
# Load, terminate, and release. | |
for read, proc in zip(queues, procs): | |
if proc.exitcode is not None: | |
if proc.exitcode == 0: | |
while True: | |
obj = read.get() | |
if obj is not None: | |
print(f'finished {proc.name}: {obj}') | |
else: | |
break | |
else: | |
print(f'terminate: {proc.name}') | |
proc.terminate() | |
print(f'terminated: {proc.name} with {proc.exitcode}') | |
if hasattr(read, 'close'): | |
read.close() # SimpleQueue has close since Python 3.9. | |
for proc in procs: | |
proc.join() | |
def num_of_fd(): | |
procdir = pathlib.Path('/proc/self/fd') | |
if procdir.is_dir(): | |
return len(list(procdir.glob('*'))) | |
else: | |
return None | |
if __name__ == '__main__': | |
rlimit_nofile = resource.getrlimit(resource.RLIMIT_NOFILE) | |
print(f'RLIMIT_NOFILE(soft, hard) = {rlimit_nofile}') | |
nofile_ratio = 10 | |
nofile = min(max(1, rlimit_nofile[0] // nofile_ratio), 100) | |
print(f'The number of open pipes per trial: {nofile}') | |
for i in range(100): | |
print(f'Trial: {i}') | |
print(f'The number of fd in proc dir: {num_of_fd()}') | |
main(nofile) | |
print(f'The number of fd in proc dir: {num_of_fd()}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment