Skip to content

Instantly share code, notes, and snippets.

@selwin
Created January 25, 2013 15:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save selwin/4635260 to your computer and use it in GitHub Desktop.
Save selwin/4635260 to your computer and use it in GitHub Desktop.
from gevent import monkey
monkey.patch_socket()
import gevent.pool
import os
import signal
import random
import time
import datetime
from multiprocessing import Semaphore, Array
_signames = dict((getattr(signal, signame), signame) \
for signame in dir(signal) \
if signame.startswith('SIG') and '_' not in signame)
def signal_name(signum):
# Hackety-hack-hack: is there really no better way to reverse lookup the
# signal name? If you read this and know a way: please provide a patch :)
try:
return _signames[signum]
except KeyError:
return 'SIG_UNKNOWN'
class BaseWorker(object):
def __init__(self, num_processes=1):
self._is_stopped = False
def work(self):
self._install_signal_handlers()
while True:
if self._is_stopped:
break
self.spawn_child()
self.quit()
def quit(self):
""" Enters a loop to wait for all children to finish and then quit """
while True:
if not self.has_active_horses():
break
time.sleep(1)
def spawn_child(self):
raise NotImplementedError('Implement this in a subclass.')
def has_active_horses(self):
# Each worker class has to implement a way of checking whether it is
# in the middle of running one or more jobs
raise NotImplementedError('Implement this in a subclass.')
def handle_quit(self):
print 'QUITTT'
def fake_work(self):
# When working, children should ignore CTRL+C
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
sleep_time = 5 * random.random()
print datetime.datetime.now(), '- Hello from', os.getpid(), '- %.3fs' % sleep_time
time.sleep(sleep_time)
print datetime.datetime.now(), '- Done from', os.getpid()
def _install_signal_handlers(self):
"""Installs signal handlers for handling SIGINT and SIGTERM
gracefully.
"""
def request_force_stop(signum, frame):
"""Terminates the application (cold shutdown).
"""
print 'Cold shut down.'
# Need to call ``handle_cold_shutdown`` implemented by subclasses
#self.handle_cold_shutdown()
raise SystemExit()
def request_stop(signum, frame):
"""Stops the current worker loop but waits for child processes to
end gracefully (warm shutdown).
"""
self._is_stopped = True
print '%s Got signal %s.' % (os.getpid(), signal_name(signum))
signal.signal(signal.SIGINT, request_force_stop)
signal.signal(signal.SIGTERM, request_force_stop)
print 'Warm shut down requested.'
signal.signal(signal.SIGINT, request_stop)
signal.signal(signal.SIGTERM, request_stop)
def process_is_alive(pid):
# Check if a process is alive by sending it a signal that does nothing
# If OSError is raised, it means the process is no longer alive
try:
os.kill(pid, 0)
return True
except OSError:
return False
class ForkingWorker(BaseWorker):
def __init__(self, num_processes=1):
super(ForkingWorker, self).__init__()
# Set up sync primitives, to communicate with the spawned children
self._semaphore = Semaphore(num_processes)
self._slots = Array('i', [0] * num_processes)
def spawn_child(self):
"""Forks and executes the job."""
self._semaphore.acquire() # responsible for the blocking
# If CTRL+C is pressed when blocking to acquire children
# (causes self._stopped to be set to True) return right away
if self._is_stopped:
return
# Select an empty slot from self._slots (the first 0 value is picked)
# The implementation guarantees there will always be at least one empty slot
for slot, value in enumerate(self._slots):
if value == 0:
break
# The usual hardcore forking action
child_pid = os.fork()
if child_pid == 0:
random.seed()
# Within child
try:
self.fake_work()
finally:
# This is the new stuff. Remember, we're in the child process
# currently. When all work is done here, free up the current
# slot (by writing a 0 in the slot position). This
# communicates to the parent that the current child has died
# (so can safely be forgotten about).
self._slots[slot] = 0
self._semaphore.release()
os._exit(0)
else:
# Within parent, keep track of the new child by writing its PID
# into the first free slot index.
self._slots[slot] = child_pid
def has_active_horses(self):
# If any of the worker slot is non zero, that means there's a job still running
for pid in self._slots:
if pid and process_is_alive(pid):
return True
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment