Skip to content

Instantly share code, notes, and snippets.

@tell
Last active November 2, 2020 12:00
Show Gist options
  • Save tell/afa5312f55e4f9db070a1a55e820d87a to your computer and use it in GitHub Desktop.
Save tell/afa5312f55e4f9db070a1a55e820d87a to your computer and use it in GitHub Desktop.
Python multiprocessing with timeout
import multiprocessing as mp
import pathlib
import random
import resource
import sys
import time
import traceback
from typing import Union
class CannotSleepAtAll(RuntimeError):
pass
class Sleeper(mp.Process):
def __init__(self, max_sleep_second: int, out_port: Union[mp.Queue, mp.SimpleQueue]):
super().__init__()
self.max_sleep_second = max_sleep_second
self.out_port = out_port
def _sleeping(self):
cproc = mp.current_process()
mss = self.max_sleep_second
succ = random.randint(1, 50)
if succ != 1:
n = random.randint(0, mss)
time.sleep(n)
return 'finish', {'name': cproc.name, 'pid': cproc.pid, 'second': n}
else:
raise CannotSleepAtAll(f'{cproc.name}')
def run(self):
out_port = self.out_port
try:
cond, result = self._sleeping()
except:
raise
else:
out_port.put(result)
out_port.put(None)
def main(num_procs: int, max_sleep_second: int = 20):
# Setup.
procs = []
queues = []
for i in range(num_procs):
queue = mp.SimpleQueue()
proc = Sleeper(max_sleep_second, queue)
procs.append(proc)
queues.append(queue)
# Execute.
for i, proc in enumerate(procs):
while True:
try:
proc.start()
except (BlockingIOError, OSError):
print(f'{traceback.format_exc()}')
print(f'number of fd in proc dir: {num_of_fd()}')
print(f'retry "Process.start()": {i}')
print()
sys.stdout.flush()
time.sleep(1)
else:
break
# Wait.
time.sleep(3)
# Load, terminate, and release.
for read, proc in zip(queues, procs):
if proc.exitcode is not None:
if proc.exitcode == 0:
while True:
obj = read.get()
if obj is not None:
print(f'finished {proc.name}: {obj}')
else:
break
else:
print(f'terminate: {proc.name}')
proc.terminate()
print(f'terminated: {proc.name} with {proc.exitcode}')
if hasattr(read, 'close'):
read.close() # SimpleQueue has close since Python 3.9.
for proc in procs:
proc.join()
def num_of_fd():
procdir = pathlib.Path('/proc/self/fd')
if procdir.is_dir():
return len(list(procdir.glob('*')))
else:
return None
if __name__ == '__main__':
rlimit_nofile = resource.getrlimit(resource.RLIMIT_NOFILE)
print(f'RLIMIT_NOFILE(soft, hard) = {rlimit_nofile}')
nofile_ratio = 10
nofile = min(max(1, rlimit_nofile[0] // nofile_ratio), 100)
print(f'The number of open pipes per trial: {nofile}')
for i in range(100):
print(f'Trial: {i}')
print(f'The number of fd in proc dir: {num_of_fd()}')
main(nofile)
print(f'The number of fd in proc dir: {num_of_fd()}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment