Skip to content

Instantly share code, notes, and snippets.

@fntneves
Last active April 22, 2018 23:42
Show Gist options
  • Save fntneves/f0d0405ba77ce50b50bf9eea808cf5af to your computer and use it in GitHub Desktop.
Save fntneves/f0d0405ba77ce50b50bf9eea808cf5af to your computer and use it in GitHub Desktop.
Producer-[Consumer/Producer]-Consumer example with graceful shutdown
from multiprocessing import Event, JoinableQueue, Process, current_process, active_children
import Queue
import time
import signal
import os
def ignore(signun, frame):
pass
def producer(output_queue):
pid = current_process().pid
exit = [False]
def start_shutdown(sigum, frame):
print 'Producer {} is interrupted...'.format(str(pid))
exit[0] = True
signal.signal(signal.SIGINT, start_shutdown)
i = 0
while not exit[0]:
output_queue.put((1, 'a', False))
i = i + 1
print 'Exiting producer... {}. Produced {} events.'.format(str(pid), str(i))
def logger(input_queue):
pid = current_process().pid
exit = False
signal.signal(signal.SIGINT, ignore)
i = 0
while not exit:
try:
# Read the remaining events and exit.
event = None
while event is None:
event = input_queue.get()
if event == 'exit':
exit = True
input_queue.task_done()
i = i + 1
except Queue.Empty:
break
print 'Exiting logger... {}. Logged {} events.'.format(str(pid), str(i))
def consumer(input_queue, output_queue):
pid = current_process().pid
exit = False
signal.signal(signal.SIGINT, ignore)
i = 0
while not exit:
try:
# Before exiting, read the remaining events and exit.
event = None
while event is None:
event = input_queue.get()
# Prevent interruption side-effects.
if event == 'exit':
exit = True
else:
output_queue.put(event)
input_queue.task_done()
i = i + 1
except Queue.Empty:
break
print 'Exiting worker... {}. Handled {} events.'.format(str(pid), str(i))
amount = 4
workers = []
event_queue = JoinableQueue()
log_queue = JoinableQueue()
signal.signal(signal.SIGINT, ignore)
logger = Process(target=logger, args=(log_queue,))
logger.daemon = True
logger.start()
workers.append(logger)
for _ in xrange(amount):
worker = Process(target=consumer, args=(event_queue, log_queue,))
worker.daemon = True
worker.start()
workers.append(worker)
producer = Process(target=producer, args=(event_queue,))
producer.daemon = True
producer.start()
# Wait for producer to end
producer.join()
print 'Attempting a graceful shutdown...'
# Wait for events to be processed.
event_queue.join()
for _ in xrange(amount):
# Send 'exit' events to all workers and to the logger
event_queue.put('exit')
# Wait for events to be logged.
log_queue.join()
log_queue.put('exit')
for worker in workers:
worker.join(timeout=5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment