Skip to content

Instantly share code, notes, and snippets.

Created January 25, 2013 15:30
Show Gist options
  • Save selwin/4635260 to your computer and use it in GitHub Desktop.
Save selwin/4635260 to your computer and use it in GitHub Desktop.
from gevent import monkey
import gevent.pool
import os
import signal
import random
import time
import datetime
from multiprocessing import Semaphore, Array
_signames = dict((getattr(signal, signame), signame) \
for signame in dir(signal) \
if signame.startswith('SIG') and '_' not in signame)
def signal_name(signum):
# Hackety-hack-hack: is there really no better way to reverse lookup the
# signal name? If you read this and know a way: please provide a patch :)
return _signames[signum]
except KeyError:
return 'SIG_UNKNOWN'
class BaseWorker(object):
def __init__(self, num_processes=1):
self._is_stopped = False
def work(self):
while True:
if self._is_stopped:
def quit(self):
""" Enters a loop to wait for all children to finish and then quit """
while True:
if not self.has_active_horses():
def spawn_child(self):
raise NotImplementedError('Implement this in a subclass.')
def has_active_horses(self):
# Each worker class has to implement a way of checking whether it is
# in the middle of running one or more jobs
raise NotImplementedError('Implement this in a subclass.')
def handle_quit(self):
print 'QUITTT'
def fake_work(self):
# When working, children should ignore CTRL+C
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
sleep_time = 5 * random.random()
print, '- Hello from', os.getpid(), '- %.3fs' % sleep_time
print, '- Done from', os.getpid()
def _install_signal_handlers(self):
"""Installs signal handlers for handling SIGINT and SIGTERM
def request_force_stop(signum, frame):
"""Terminates the application (cold shutdown).
print 'Cold shut down.'
# Need to call ``handle_cold_shutdown`` implemented by subclasses
raise SystemExit()
def request_stop(signum, frame):
"""Stops the current worker loop but waits for child processes to
end gracefully (warm shutdown).
self._is_stopped = True
print '%s Got signal %s.' % (os.getpid(), signal_name(signum))
signal.signal(signal.SIGINT, request_force_stop)
signal.signal(signal.SIGTERM, request_force_stop)
print 'Warm shut down requested.'
signal.signal(signal.SIGINT, request_stop)
signal.signal(signal.SIGTERM, request_stop)
def process_is_alive(pid):
# Check if a process is alive by sending it a signal that does nothing
# If OSError is raised, it means the process is no longer alive
os.kill(pid, 0)
return True
except OSError:
return False
class ForkingWorker(BaseWorker):
def __init__(self, num_processes=1):
super(ForkingWorker, self).__init__()
# Set up sync primitives, to communicate with the spawned children
self._semaphore = Semaphore(num_processes)
self._slots = Array('i', [0] * num_processes)
def spawn_child(self):
"""Forks and executes the job."""
self._semaphore.acquire() # responsible for the blocking
# If CTRL+C is pressed when blocking to acquire children
# (causes self._stopped to be set to True) return right away
if self._is_stopped:
# Select an empty slot from self._slots (the first 0 value is picked)
# The implementation guarantees there will always be at least one empty slot
for slot, value in enumerate(self._slots):
if value == 0:
# The usual hardcore forking action
child_pid = os.fork()
if child_pid == 0:
# Within child
# This is the new stuff. Remember, we're in the child process
# currently. When all work is done here, free up the current
# slot (by writing a 0 in the slot position). This
# communicates to the parent that the current child has died
# (so can safely be forgotten about).
self._slots[slot] = 0
# Within parent, keep track of the new child by writing its PID
# into the first free slot index.
self._slots[slot] = child_pid
def has_active_horses(self):
# If any of the worker slot is non zero, that means there's a job still running
for pid in self._slots:
if pid and process_is_alive(pid):
return True
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment