Created
December 3, 2013 15:01
-
-
Save alanmeadows/7770570 to your computer and use it in GitHub Desktop.
This is a modified patch from https://github.com/peterfeiner/quantum/commit/quantum-service-workers-for-grizzly.patch which adds support for multiple api workers to the grizzly/quantum api server. This modified patch uses the configuration parameter "api_workers" instead of "workers" so that it is forward compatible with havana.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From bd1e14456291e8639c604a42c9894e565f91918a Mon Sep 17 00:00:00 2001 | |
From: Peter Feiner <peter@gridcentric.ca> | |
Date: Thu, 13 Jun 2013 14:30:53 +0000 | |
Subject: [PATCH] Workers! | |
--- | |
quantum/agent/metadata/agent.py | 22 ++- | |
quantum/agent/metadata/namespace_proxy.py | 4 +- | |
quantum/service.py | 253 +++++++++++++++++++++++++++++- | |
quantum/wsgi.py | 8 +- | |
4 files changed, 265 insertions(+), 22 deletions(-) | |
diff --git a/quantum/agent/metadata/agent.py b/quantum/agent/metadata/agent.py | |
index 7bdfae8..534a81e 100644 | |
--- a/quantum/agent/metadata/agent.py | |
+++ b/quantum/agent/metadata/agent.py | |
@@ -176,11 +176,16 @@ def __init__(self, request, client_address, server): | |
class UnixDomainWSGIServer(wsgi.Server): | |
- def start(self, application, file_socket, backlog=128): | |
- sock = eventlet.listen(file_socket, | |
- family=socket.AF_UNIX, | |
- backlog=backlog) | |
- self.pool.spawn_n(self._run, application, sock) | |
+ | |
+ def __init__(self, name, file_socket, threads=1000, backlog=128): | |
+ self.pool = eventlet.GreenPool(threads) | |
+ self.name = name | |
+ self._socket = eventlet.listen(file_socket, | |
+ family=socket.AF_UNIX, | |
+ backlog=backlog) | |
+ | |
+ def start(self, application): | |
+ self.pool.spawn_n(self._run, application, self._socket) | |
def _run(self, application, socket): | |
"""Start a WSGI service in a new green thread.""" | |
@@ -213,9 +218,10 @@ def __init__(self, conf): | |
os.makedirs(dirname, 0755) | |
def run(self): | |
- server = UnixDomainWSGIServer('quantum-metadata-agent') | |
- server.start(MetadataProxyHandler(self.conf), | |
- self.conf.metadata_proxy_socket) | |
+ server = UnixDomainWSGIServer( | |
+ 'quantum-metadata-agent', | |
+ self.conf.metadata_proxy_socket) | |
+ server.start(MetadataProxyHandler(self.conf)) | |
server.wait() | |
diff --git a/quantum/agent/metadata/namespace_proxy.py b/quantum/agent/metadata/namespace_proxy.py | |
index aa12d24..826ff27 100644 | |
--- a/quantum/agent/metadata/namespace_proxy.py | |
+++ b/quantum/agent/metadata/namespace_proxy.py | |
@@ -141,8 +141,8 @@ def run(self): | |
handler = NetworkMetadataProxyHandler( | |
self.network_id, | |
self.router_id) | |
- proxy = wsgi.Server('quantum-network-metadata-proxy') | |
- proxy.start(handler, self.port) | |
+ proxy = wsgi.Server('quantum-network-metadata-proxy', self.port) | |
+ proxy.start(handler) | |
proxy.wait() | |
diff --git a/quantum/service.py b/quantum/service.py | |
index 3352928..8cc277c 100644 | |
--- a/quantum/service.py | |
+++ b/quantum/service.py | |
@@ -19,6 +19,16 @@ | |
import logging as std_logging | |
import os | |
import random | |
+import routes | |
+import errno | |
+import inspect | |
+import os | |
+import random | |
+import signal | |
+import sys | |
+import time | |
+import eventlet | |
+import greenlet | |
from oslo.config import cfg | |
@@ -34,6 +44,7 @@ | |
LOG = logging.getLogger(__name__) | |
service_opts = [ | |
+ cfg.IntOpt('api_workers', default=1), | |
cfg.IntOpt('periodic_interval', | |
default=40, | |
help=_('Seconds between running periodic tasks')), | |
@@ -60,10 +71,11 @@ class WsgiService(object): | |
def __init__(self, app_name): | |
self.app_name = app_name | |
+ self.wsgi_server = _load_wsgi() | |
self.wsgi_app = None | |
def start(self): | |
- self.wsgi_app = _run_wsgi(self.app_name) | |
+ self.wsgi_app = _run_wsgi(self.wsgi_server, self.app_name) | |
def wait(self): | |
self.wsgi_app.wait() | |
@@ -91,23 +103,24 @@ def create(cls): | |
def serve_wsgi(cls): | |
try: | |
- service = cls.create() | |
+ launcher = ProcessLauncher() | |
+ launcher.launch_server(cls.create(), workers=CONF.api_workers) | |
except Exception: | |
LOG.exception(_('In WsgiService.create()')) | |
raise | |
- service.start() | |
+ return launcher | |
- return service | |
+def _load_wsgi(): | |
+ return wsgi.Server("Quantum", cfg.CONF.bind_port, cfg.CONF.bind_host) | |
-def _run_wsgi(app_name): | |
+def _run_wsgi(server, app_name): | |
app = config.load_paste_app(app_name) | |
if not app: | |
LOG.error(_('No known API applications configured.')) | |
return | |
- server = wsgi.Server("Quantum") | |
- server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host) | |
+ server.start(app) | |
# Dump all option values here after all options are parsed | |
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG) | |
LOG.info(_("Quantum service started, listening on %(host)s:%(port)s"), | |
@@ -115,7 +128,6 @@ def _run_wsgi(app_name): | |
'port': cfg.CONF.bind_port}) | |
return server | |
- | |
class Service(service.Service): | |
"""Service object for binaries running on hosts. | |
@@ -231,3 +243,228 @@ def report_state(self): | |
"""Update the state of this service.""" | |
# Todo(gongysh) report state to quantum server | |
pass | |
+ | |
+ | |
+class SignalExit(SystemExit): | |
+ def __init__(self, signo, exccode=1): | |
+ super(SignalExit, self).__init__(exccode) | |
+ self.signo = signo | |
+ | |
+ | |
+class Launcher(object): | |
+ """Launch one or more services and wait for them to complete.""" | |
+ | |
+ def __init__(self): | |
+ """Initialize the service launcher. | |
+ | |
+ :returns: None | |
+ | |
+ """ | |
+ self._services = [] | |
+ | |
+ @staticmethod | |
+ def run_server(server): | |
+ """Start and wait for a server to finish. | |
+ | |
+ :param service: Server to run and wait for. | |
+ :returns: None | |
+ | |
+ """ | |
+ server.start() | |
+ server.wait() | |
+ | |
+ def launch_server(self, server): | |
+ """Load and start the given server. | |
+ | |
+ :param server: The server you would like to start. | |
+ :returns: None | |
+ | |
+ """ | |
+ gt = eventlet.spawn(self.run_server, server) | |
+ self._services.append(gt) | |
+ | |
+ def stop(self): | |
+ """Stop all services which are currently running. | |
+ | |
+ :returns: None | |
+ | |
+ """ | |
+ for service in self._services: | |
+ service.kill() | |
+ | |
+ def wait(self): | |
+ """Waits until all services have been stopped, and then returns. | |
+ | |
+ :returns: None | |
+ | |
+ """ | |
+ for service in self._services: | |
+ try: | |
+ service.wait() | |
+ except greenlet.GreenletExit: | |
+ pass | |
+ | |
+ | |
+class ServerWrapper(object): | |
+ def __init__(self, server, workers): | |
+ self.server = server | |
+ self.workers = workers | |
+ self.children = set() | |
+ self.forktimes = [] | |
+ | |
+ | |
+class ProcessLauncher(object): | |
+ def __init__(self): | |
+ self.children = {} | |
+ self.sigcaught = None | |
+ self.running = True | |
+ rfd, self.writepipe = os.pipe() | |
+ self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') | |
+ | |
+ signal.signal(signal.SIGTERM, self._handle_signal) | |
+ signal.signal(signal.SIGINT, self._handle_signal) | |
+ | |
+ def _handle_signal(self, signo, frame): | |
+ self.sigcaught = signo | |
+ self.running = False | |
+ | |
+ # Allow the process to be killed again and die from natural causes | |
+ signal.signal(signal.SIGTERM, signal.SIG_DFL) | |
+ signal.signal(signal.SIGINT, signal.SIG_DFL) | |
+ | |
+ def _pipe_watcher(self): | |
+ # This will block until the write end is closed when the parent | |
+ # dies unexpectedly | |
+ self.readpipe.read() | |
+ | |
+ LOG.info(_('Parent process has died unexpectedly, exiting')) | |
+ | |
+ sys.exit(1) | |
+ | |
+ def _child_process(self, server): | |
+ # Setup child signal handlers differently | |
+ def _sigterm(*args): | |
+ signal.signal(signal.SIGTERM, signal.SIG_DFL) | |
+ raise SignalExit(signal.SIGTERM) | |
+ | |
+ signal.signal(signal.SIGTERM, _sigterm) | |
+ # Block SIGINT and let the parent send us a SIGTERM | |
+ signal.signal(signal.SIGINT, signal.SIG_IGN) | |
+ | |
+ # Reopen the eventlet hub to make sure we don't share an epoll | |
+ # fd with parent and/or siblings, which would be bad | |
+ eventlet.hubs.use_hub() | |
+ | |
+ # Close write to ensure only parent has it open | |
+ os.close(self.writepipe) | |
+ # Create greenthread to watch for parent to close pipe | |
+ eventlet.spawn(self._pipe_watcher) | |
+ | |
+ # Reseed random number generator | |
+ random.seed() | |
+ | |
+ launcher = Launcher() | |
+ launcher.run_server(server) | |
+ | |
+ def _start_child(self, wrap): | |
+ if len(wrap.forktimes) > wrap.workers: | |
+ # Limit ourselves to one process a second (over the period of | |
+ # number of workers * 1 second). This will allow workers to | |
+ # start up quickly but ensure we don't fork off children that | |
+ # die instantly too quickly. | |
+ if time.time() - wrap.forktimes[0] < wrap.workers: | |
+ LOG.info(_('Forking too fast, sleeping')) | |
+ time.sleep(1) | |
+ | |
+ wrap.forktimes.pop(0) | |
+ | |
+ wrap.forktimes.append(time.time()) | |
+ | |
+ pid = os.fork() | |
+ if pid == 0: | |
+ # NOTE(johannes): All exceptions are caught to ensure this | |
+ # doesn't fallback into the loop spawning children. It would | |
+ # be bad for a child to spawn more children. | |
+ status = 0 | |
+ try: | |
+ LOG.info(_('Running...')) | |
+ self._child_process(wrap.server) | |
+ LOG.info(_('Finished.')) | |
+ except SignalExit as exc: | |
+ signame = {signal.SIGTERM: 'SIGTERM', | |
+ signal.SIGINT: 'SIGINT'}[exc.signo] | |
+ LOG.info(_('Caught %s, exiting'), signame) | |
+ status = exc.code | |
+ except SystemExit as exc: | |
+ LOG.info(_('Exited with status %d.'), status) | |
+ status = exc.code | |
+ except BaseException: | |
+ LOG.exception(_('Unhandled exception')) | |
+ status = 2 | |
+ | |
+ os._exit(status) | |
+ | |
+ LOG.info(_('Started child %d'), pid) | |
+ | |
+ wrap.children.add(pid) | |
+ self.children[pid] = wrap | |
+ | |
+ return pid | |
+ | |
+ def launch_server(self, server, workers=1): | |
+ wrap = ServerWrapper(server, workers) | |
+ | |
+ LOG.info(_('Starting %d workers'), wrap.workers) | |
+ while self.running and len(wrap.children) < wrap.workers: | |
+ self._start_child(wrap) | |
+ | |
+ def _wait_child(self): | |
+ try: | |
+ pid, status = os.wait() | |
+ except OSError as exc: | |
+ if exc.errno not in (errno.EINTR, errno.ECHILD): | |
+ raise | |
+ return None | |
+ | |
+ if os.WIFSIGNALED(status): | |
+ sig = os.WTERMSIG(status) | |
+ LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals()) | |
+ else: | |
+ code = os.WEXITSTATUS(status) | |
+ LOG.info(_('Child %(pid)d exited with status %(code)d'), locals()) | |
+ | |
+ if pid not in self.children: | |
+ LOG.warning(_('pid %d not in child list'), pid) | |
+ return None | |
+ | |
+ wrap = self.children.pop(pid) | |
+ wrap.children.remove(pid) | |
+ return wrap | |
+ | |
+ def wait(self): | |
+ """Loop waiting on children to die and respawning as necessary.""" | |
+ while self.running: | |
+ wrap = self._wait_child() | |
+ if not wrap: | |
+ continue | |
+ | |
+ while self.running and len(wrap.children) < wrap.workers: | |
+ self._start_child(wrap) | |
+ | |
+ if self.sigcaught: | |
+ signame = {signal.SIGTERM: 'SIGTERM', | |
+ signal.SIGINT: 'SIGINT'}[self.sigcaught] | |
+ LOG.info(_('Caught %s, stopping children'), signame) | |
+ | |
+ for pid in self.children: | |
+ try: | |
+ os.kill(pid, signal.SIGTERM) | |
+ except OSError as exc: | |
+ if exc.errno != errno.ESRCH: | |
+ raise | |
+ | |
+ # Wait for children to die | |
+ if self.children: | |
+ LOG.info(_('Waiting on %d children to exit'), len(self.children)) | |
+ while self.children: | |
+ self._wait_child() | |
diff --git a/quantum/wsgi.py b/quantum/wsgi.py | |
index 63f3ca4..ca09596 100644 | |
--- a/quantum/wsgi.py | |
+++ b/quantum/wsgi.py | |
@@ -47,12 +47,9 @@ def run_server(application, port): | |
class Server(object): | |
"""Server class to manage multiple WSGI sockets and applications.""" | |
- def __init__(self, name, threads=1000): | |
+ def __init__(self, name, port, host='0.0.0.0', backlog=128, threads=1000): | |
self.pool = eventlet.GreenPool(threads) | |
self.name = name | |
- | |
- def start(self, application, port, host='0.0.0.0', backlog=128): | |
- """Run a WSGI server with the given application.""" | |
self._host = host | |
self._port = port | |
@@ -75,6 +72,9 @@ def start(self, application, port, host='0.0.0.0', backlog=128): | |
{'host': host, 'port': port}) | |
sys.exit(1) | |
+ | |
+ def start(self, application): | |
+ """Run a WSGI server with the given application.""" | |
self._server = self.pool.spawn(self._run, application, self._socket) | |
@property | |
-- | |
1.8.4 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment