Created
January 25, 2013 15:30
-
-
Save selwin/4635260 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from gevent import monkey | |
monkey.patch_socket() | |
import gevent.pool | |
import os | |
import signal | |
import random | |
import time | |
import datetime | |
from multiprocessing import Semaphore, Array | |
_signames = dict((getattr(signal, signame), signame) \ | |
for signame in dir(signal) \ | |
if signame.startswith('SIG') and '_' not in signame) | |
def signal_name(signum): | |
# Hackety-hack-hack: is there really no better way to reverse lookup the | |
# signal name? If you read this and know a way: please provide a patch :) | |
try: | |
return _signames[signum] | |
except KeyError: | |
return 'SIG_UNKNOWN' | |
class BaseWorker(object): | |
def __init__(self, num_processes=1): | |
self._is_stopped = False | |
def work(self): | |
self._install_signal_handlers() | |
while True: | |
if self._is_stopped: | |
break | |
self.spawn_child() | |
self.quit() | |
def quit(self): | |
""" Enters a loop to wait for all children to finish and then quit """ | |
while True: | |
if not self.has_active_horses(): | |
break | |
time.sleep(1) | |
def spawn_child(self): | |
raise NotImplementedError('Implement this in a subclass.') | |
def has_active_horses(self): | |
# Each worker class has to implement a way of checking whether it is | |
# in the middle of running one or more jobs | |
raise NotImplementedError('Implement this in a subclass.') | |
def handle_quit(self): | |
print 'QUITTT' | |
def fake_work(self): | |
# When working, children should ignore CTRL+C | |
signal.signal(signal.SIGINT, signal.SIG_IGN) | |
signal.signal(signal.SIGTERM, signal.SIG_DFL) | |
sleep_time = 5 * random.random() | |
print datetime.datetime.now(), '- Hello from', os.getpid(), '- %.3fs' % sleep_time | |
time.sleep(sleep_time) | |
print datetime.datetime.now(), '- Done from', os.getpid() | |
def _install_signal_handlers(self): | |
"""Installs signal handlers for handling SIGINT and SIGTERM | |
gracefully. | |
""" | |
def request_force_stop(signum, frame): | |
"""Terminates the application (cold shutdown). | |
""" | |
print 'Cold shut down.' | |
# Need to call ``handle_cold_shutdown`` implemented by subclasses | |
#self.handle_cold_shutdown() | |
raise SystemExit() | |
def request_stop(signum, frame): | |
"""Stops the current worker loop but waits for child processes to | |
end gracefully (warm shutdown). | |
""" | |
self._is_stopped = True | |
print '%s Got signal %s.' % (os.getpid(), signal_name(signum)) | |
signal.signal(signal.SIGINT, request_force_stop) | |
signal.signal(signal.SIGTERM, request_force_stop) | |
print 'Warm shut down requested.' | |
signal.signal(signal.SIGINT, request_stop) | |
signal.signal(signal.SIGTERM, request_stop) | |
def process_is_alive(pid): | |
# Check if a process is alive by sending it a signal that does nothing | |
# If OSError is raised, it means the process is no longer alive | |
try: | |
os.kill(pid, 0) | |
return True | |
except OSError: | |
return False | |
class ForkingWorker(BaseWorker): | |
def __init__(self, num_processes=1): | |
super(ForkingWorker, self).__init__() | |
# Set up sync primitives, to communicate with the spawned children | |
self._semaphore = Semaphore(num_processes) | |
self._slots = Array('i', [0] * num_processes) | |
def spawn_child(self): | |
"""Forks and executes the job.""" | |
self._semaphore.acquire() # responsible for the blocking | |
# If CTRL+C is pressed when blocking to acquire children | |
# (causes self._stopped to be set to True) return right away | |
if self._is_stopped: | |
return | |
# Select an empty slot from self._slots (the first 0 value is picked) | |
# The implementation guarantees there will always be at least one empty slot | |
for slot, value in enumerate(self._slots): | |
if value == 0: | |
break | |
# The usual hardcore forking action | |
child_pid = os.fork() | |
if child_pid == 0: | |
random.seed() | |
# Within child | |
try: | |
self.fake_work() | |
finally: | |
# This is the new stuff. Remember, we're in the child process | |
# currently. When all work is done here, free up the current | |
# slot (by writing a 0 in the slot position). This | |
# communicates to the parent that the current child has died | |
# (so can safely be forgotten about). | |
self._slots[slot] = 0 | |
self._semaphore.release() | |
os._exit(0) | |
else: | |
# Within parent, keep track of the new child by writing its PID | |
# into the first free slot index. | |
self._slots[slot] = child_pid | |
def has_active_horses(self): | |
# If any of the worker slot is non zero, that means there's a job still running | |
for pid in self._slots: | |
if pid and process_is_alive(pid): | |
return True | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment