Skip to content

Instantly share code, notes, and snippets.

@tappoz
Last active October 2, 2022 04:00
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tappoz/cb88f7a9d9ba27cfee1e2f03537fac16 to your computer and use it in GitHub Desktop.
Save tappoz/cb88f7a9d9ba27cfee1e2f03537fac16 to your computer and use it in GitHub Desktop.
Python multiprocessing with global timeout

How to join processes with timeout

The process.join(NUM_SECONDS) does not fit, because the normal for loop to join all the processes waits NUM_SECONDS for each process before joining it.

We want a global amount of NUM_SECONDS to be timed out on to kill (terminate) all the outstanding processes that are still alive after NUM_SECONDS elapsed (i.e. not yet been joined already).

The following code snippet has been heavily inspired by: https://stackoverflow.com/questions/26063877/python-multiprocessing-module-join-processes-with-timeout

import random
import time
import multiprocessing

conf = {}
conf['timeout_in_seconds'] = 5
conf['process_upper_bound_in_seconds'] = 6
conf['num_of_simultaneous_processes'] = 5

def timeout_procs(procs):
  '''
  No matter what, ALL the processes are:
  - either joined
  - or terminated and joined
  '''
  timeout_in_seconds = conf['timeout_in_seconds']
  start = time.time()
  while time.time() - start <= timeout_in_seconds:
    if any(p.is_alive() for p in procs):
      time.sleep(.1)  # Just to avoid hogging the CPU
    else:
      # All the processes are done, break now.
      print('yuppie! all the processes finished on time! :)')
      for p in procs:
        p.join() # make sure things are stopped properly
        print('stopping process {}'.format(p.name))
      break
  else:
    # We only enter this if we didn't 'break' above during the while loop!
    print("timed out, killing all processes")
    for p in procs:
      if not p.is_alive():
        print('this process is already finished: {}'.format(p.name))
      else:
        print('this process MUST be killed: {} (timeout of {} seconds has passed)'.format(p.name, timeout_in_seconds))
        p.terminate()
      print(' -> stopping (joining) process {}'.format(p.name))
      p.join()

def worker(num):
  print('started worker {}'.format(num))
  start_time = time.time()
  time.sleep(conf['process_upper_bound_in_seconds']*random.random())
  elapsed_time = time.time() - start_time
  print('finished worker {} after {} seconds'.format(num, elapsed_time))

def start_procs(num_procs):
  procs = []
  for i in range(num_procs):
    proc_idx = i+1 # so e.g. for 10 processes: index is from 1 (included) to 11 (excluded i.e. 10 included)
    p = multiprocessing.Process(target=worker, args=(proc_idx,), name=('process_' + str(proc_idx)))
    procs.append(p)
    p.start()
    print('starting process {}'.format(p.name))
  return procs

def orchestrate_multi_processes():
  print('the current configuration is: {}'.format(conf))
  num_procs = conf['num_of_simultaneous_processes']
  procs = start_procs(num_procs)
  timeout_procs(procs)

if __name__ == "__main__":
  orchestrate_multi_processes()

Adding a queue to the timed-out processes

In the following example a SimpleQueue is used to store the output of the partial chunks of work performed by the workers. These message simulate intensive I/O operations like network calls that could take a long time.

The queue is frequently inspected to retrieve potential new messages coming from the workers.

import random
import time
import multiprocessing
from multiprocessing.queues import SimpleQueue

conf = {}
conf['timeout_in_seconds'] = 5
conf['process_upper_bound_in_seconds'] = 8
conf['num_of_simultaneous_processes'] = 5

def empty_the_queue(output_queue):
  while not output_queue.empty():
    print('flushing message: {}'.format(output_queue.get()))
  print('done with flushings for this call')

def timeout_procs(procs, out_queue):
  '''
  No matter what, ALL the processes are:
  - either joined
  - or terminated and joined
  '''
  timeout_in_seconds = conf['timeout_in_seconds']
  start = time.time()
  while time.time() - start <= timeout_in_seconds:
    if any(p.is_alive() for p in procs):
      time.sleep(.1)  # Just to avoid hogging the CPU
      empty_the_queue(out_queue) # TODO attention here! maybe avoid this!!!
    else:
      # All the processes are done, break now.
      print('yuppie! all the processes finished on time! :)')
      for p in procs:
        print('stopping process {}'.format(p.name))
        p.join() # make sure things are stopped properly
        empty_the_queue(out_queue)
      break
  else:
    # We only enter this if we didn't 'break' above during the while loop!
    print("timed out, killing all processes")
    for p in procs:
      if not p.is_alive():
        print('this process is already finished: {}'.format(p.name))
      else:
        print('this process MUST be killed: {} (timeout of {} seconds has passed)'.format(p.name, timeout_in_seconds))
        p.terminate()
      print(' -> stopping (joining) process {}'.format(p.name))
      p.join()
      empty_the_queue(out_queue)

def worker(num, output_queue):
  print('started worker {}'.format(num))
  start_time = time.time()
  seconds_chunks_counter = 1
  while seconds_chunks_counter <= conf['process_upper_bound_in_seconds']:
    the_IO_intensive_task = random.random() # this could be a network call that hangs for too much time
    time.sleep(the_IO_intensive_task)
    output_queue.put('worker {}: done with {} out of {}'.format(num, seconds_chunks_counter, conf['process_upper_bound_in_seconds']))
    seconds_chunks_counter += 1 
  elapsed_time = time.time() - start_time
  print('finished worker {} after {} seconds'.format(num, elapsed_time))

def start_procs(num_procs, out_queue):
  procs = []
  for i in range(num_procs):
    proc_idx = i+1 # so e.g. for 10 processes: index is from 1 (included) to 11 (excluded i.e. 10 included)
    p = multiprocessing.Process(target=worker, args=(proc_idx,out_queue), name=('process_' + str(proc_idx)))
    procs.append(p)
    p.start()
    print('starting process {}'.format(p.name))
  return procs

def orchestrate_multi_processes():
  print('the current configuration is: {}'.format(conf))
  num_procs = conf['num_of_simultaneous_processes']
  output_q = SimpleQueue()
  procs = start_procs(num_procs, output_q)
  timeout_procs(procs, output_q)

if __name__ == "__main__":
  orchestrate_multi_processes()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment