Skip to content

Instantly share code, notes, and snippets.

@growse
Last active April 23, 2018 21:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save growse/f37ca9da772d9b43b97d819e2d08eac1 to your computer and use it in GitHub Desktop.
Save growse/f37ca9da772d9b43b97d819e2d08eac1 to your computer and use it in GitHub Desktop.
Coordinating multiple python2.7 processes, IPC and signals
#!/usr/bin/env python
import logging
import os
import signal
import sys
import multiprocessing
import threading
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
magic_queue = multiprocessing.Queue()
# Quit on these things
signals = (signal.SIGINT, signal.SIGTERM, signal.SIGCHLD, signal.SIGQUIT)
# This subprocess methods doesn't need to signal to quit, it just needs to see if anyone has set my_event
def do_thing():
ppid = os.getppid() # Write down what our parent pid is
my_event = threading.Event()
def my_signal_handler(signal, _):
logging.info("Do Thing Signal handler")
my_event.set()
# Signal handlers are inherited. So we need to ignore them when sent to the child.
for signame in signals:
signal.signal(signame, my_signal_handler)
while not my_event.is_set():
logging.info("Doing a thing")
my_event.wait(5)
if ppid != os.getppid(): # If this has happened, parent is dead and we're a zombie. Probably best to stop.
my_event.set()
# This subprocess may need to tell everyone to quit, so needs the queue that the master is blocking on. But it also
# needs the event, to listen to other things that might want to tell it to stop.
def do_another_thing(my_queue):
ppid = os.getppid()
my_event = threading.Event()
def my_signal_handler(signal, _):
logging.info("Do Another Thing Signal handler")
my_event.set()
for signame in signals:
signal.signal(signame, my_signal_handler)
while not my_event.is_set():
if os.path.isfile("quitnow"):
logging.info("Found quit file, quitting")
break
my_event.wait(1)
if ppid != os.getppid():
my_event.set()
my_queue.put("PLS STOP")
def signal_handler(received_signal, _):
logging.info("Received signal {}".format(received_signal))
if received_signal == signal.SIGCHLD:
pid, status = os.waitpid(-1, os.WNOHANG | os.WUNTRACED | os.WCONTINUED)
if os.WIFCONTINUED(status) or os.WIFSTOPPED(status):
return
if os.WIFSIGNALED(status) or os.WIFEXITED(status):
logging.info("SIGCHLD. Something happened. Pid: {}, Status: {}".format(pid, status))
logging.info("Something signalled or exited. Term sig is {}".format(os.WTERMSIG(status)))
magic_queue.put(True)
else:
# Let's assume any signal means "quit"
logging.info("Signal handler queuing a message")
magic_queue.put(True)
for signame in signals:
signal.signal(signame, signal_handler)
joinables = []
subprocess1 = multiprocessing.Process(target=do_thing)
joinables.append(subprocess1)
subprocess1.start()
subprocess2 = multiprocessing.Process(target=do_another_thing, args=(magic_queue,))
joinables.append(subprocess2)
subprocess2.start()
logging.info("Waiting on... something?")
# At the first whiff of trouble, something sends a message to this queue and we bring the whole thing down.
# We use a queue here because inexplicably, Event.wait() deadlocks with the signal_handler.
result = magic_queue.get()
logging.info("Got the queue message: {}".format(result))
signal.signal(signal.SIGCHLD, signal.SIG_DFL) # We don't care for SIGCHLDs any more.
logging.info("Killing children")
for joinable in joinables:
try:
logging.info("Trying to terminate {} pid {}".format(joinable, joinable.pid))
joinable.terminate()
except OSError:
# Turns out the subprocess might not exist any more, even if the master thinks it does
pass
# Wait for stuff to stop
for joinable in joinables:
logging.info("joining on {}".format(joinable))
joinable.join()
# Hurray!
logging.info("exit")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment