Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
This is a modified patch from https://github.com/peterfeiner/quantum/commit/quantum-service-workers-for-grizzly.patch which adds support for multiple api workers to the grizzly/quantum api server. This modified patch uses the configuration parameter "api_workers" instead of "workers" so that it is forward compatible with havana.
From bd1e14456291e8639c604a42c9894e565f91918a Mon Sep 17 00:00:00 2001
From: Peter Feiner <peter@gridcentric.ca>
Date: Thu, 13 Jun 2013 14:30:53 +0000
Subject: [PATCH] Workers!
---
quantum/agent/metadata/agent.py | 22 ++-
quantum/agent/metadata/namespace_proxy.py | 4 +-
quantum/service.py | 253 +++++++++++++++++++++++++++++-
quantum/wsgi.py | 8 +-
4 files changed, 265 insertions(+), 22 deletions(-)
diff --git a/quantum/agent/metadata/agent.py b/quantum/agent/metadata/agent.py
index 7bdfae8..534a81e 100644
--- a/quantum/agent/metadata/agent.py
+++ b/quantum/agent/metadata/agent.py
@@ -176,11 +176,16 @@ def __init__(self, request, client_address, server):
class UnixDomainWSGIServer(wsgi.Server):
- def start(self, application, file_socket, backlog=128):
- sock = eventlet.listen(file_socket,
- family=socket.AF_UNIX,
- backlog=backlog)
- self.pool.spawn_n(self._run, application, sock)
+
+ def __init__(self, name, file_socket, threads=1000, backlog=128):
+ self.pool = eventlet.GreenPool(threads)
+ self.name = name
+ self._socket = eventlet.listen(file_socket,
+ family=socket.AF_UNIX,
+ backlog=backlog)
+
+ def start(self, application):
+ self.pool.spawn_n(self._run, application, self._socket)
def _run(self, application, socket):
"""Start a WSGI service in a new green thread."""
@@ -213,9 +218,10 @@ def __init__(self, conf):
os.makedirs(dirname, 0755)
def run(self):
- server = UnixDomainWSGIServer('quantum-metadata-agent')
- server.start(MetadataProxyHandler(self.conf),
- self.conf.metadata_proxy_socket)
+ server = UnixDomainWSGIServer(
+ 'quantum-metadata-agent',
+ self.conf.metadata_proxy_socket)
+ server.start(MetadataProxyHandler(self.conf))
server.wait()
diff --git a/quantum/agent/metadata/namespace_proxy.py b/quantum/agent/metadata/namespace_proxy.py
index aa12d24..826ff27 100644
--- a/quantum/agent/metadata/namespace_proxy.py
+++ b/quantum/agent/metadata/namespace_proxy.py
@@ -141,8 +141,8 @@ def run(self):
handler = NetworkMetadataProxyHandler(
self.network_id,
self.router_id)
- proxy = wsgi.Server('quantum-network-metadata-proxy')
- proxy.start(handler, self.port)
+ proxy = wsgi.Server('quantum-network-metadata-proxy', self.port)
+ proxy.start(handler)
proxy.wait()
diff --git a/quantum/service.py b/quantum/service.py
index 3352928..8cc277c 100644
--- a/quantum/service.py
+++ b/quantum/service.py
@@ -19,6 +19,16 @@
import logging as std_logging
import os
import random
+import routes
+import errno
+import inspect
+import os
+import random
+import signal
+import sys
+import time
+import eventlet
+import greenlet
from oslo.config import cfg
@@ -34,6 +44,7 @@
LOG = logging.getLogger(__name__)
service_opts = [
+ cfg.IntOpt('api_workers', default=1),
cfg.IntOpt('periodic_interval',
default=40,
help=_('Seconds between running periodic tasks')),
@@ -60,10 +71,11 @@ class WsgiService(object):
def __init__(self, app_name):
self.app_name = app_name
+ self.wsgi_server = _load_wsgi()
self.wsgi_app = None
def start(self):
- self.wsgi_app = _run_wsgi(self.app_name)
+ self.wsgi_app = _run_wsgi(self.wsgi_server, self.app_name)
def wait(self):
self.wsgi_app.wait()
@@ -91,23 +103,24 @@ def create(cls):
def serve_wsgi(cls):
try:
- service = cls.create()
+ launcher = ProcessLauncher()
+ launcher.launch_server(cls.create(), workers=CONF.api_workers)
except Exception:
LOG.exception(_('In WsgiService.create()'))
raise
- service.start()
+ return launcher
- return service
+def _load_wsgi():
+ return wsgi.Server("Quantum", cfg.CONF.bind_port, cfg.CONF.bind_host)
-def _run_wsgi(app_name):
+def _run_wsgi(server, app_name):
app = config.load_paste_app(app_name)
if not app:
LOG.error(_('No known API applications configured.'))
return
- server = wsgi.Server("Quantum")
- server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host)
+ server.start(app)
# Dump all option values here after all options are parsed
cfg.CONF.log_opt_values(LOG, std_logging.DEBUG)
LOG.info(_("Quantum service started, listening on %(host)s:%(port)s"),
@@ -115,7 +128,6 @@ def _run_wsgi(app_name):
'port': cfg.CONF.bind_port})
return server
-
class Service(service.Service):
"""Service object for binaries running on hosts.
@@ -231,3 +243,228 @@ def report_state(self):
"""Update the state of this service."""
# Todo(gongysh) report state to quantum server
pass
+
+
+class SignalExit(SystemExit):
+ def __init__(self, signo, exccode=1):
+ super(SignalExit, self).__init__(exccode)
+ self.signo = signo
+
+
+class Launcher(object):
+ """Launch one or more services and wait for them to complete."""
+
+ def __init__(self):
+ """Initialize the service launcher.
+
+ :returns: None
+
+ """
+ self._services = []
+
+ @staticmethod
+ def run_server(server):
+ """Start and wait for a server to finish.
+
+ :param service: Server to run and wait for.
+ :returns: None
+
+ """
+ server.start()
+ server.wait()
+
+ def launch_server(self, server):
+ """Load and start the given server.
+
+ :param server: The server you would like to start.
+ :returns: None
+
+ """
+ gt = eventlet.spawn(self.run_server, server)
+ self._services.append(gt)
+
+ def stop(self):
+ """Stop all services which are currently running.
+
+ :returns: None
+
+ """
+ for service in self._services:
+ service.kill()
+
+ def wait(self):
+ """Waits until all services have been stopped, and then returns.
+
+ :returns: None
+
+ """
+ for service in self._services:
+ try:
+ service.wait()
+ except greenlet.GreenletExit:
+ pass
+
+
+class ServerWrapper(object):
+ def __init__(self, server, workers):
+ self.server = server
+ self.workers = workers
+ self.children = set()
+ self.forktimes = []
+
+
+class ProcessLauncher(object):
+ def __init__(self):
+ self.children = {}
+ self.sigcaught = None
+ self.running = True
+ rfd, self.writepipe = os.pipe()
+ self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r')
+
+ signal.signal(signal.SIGTERM, self._handle_signal)
+ signal.signal(signal.SIGINT, self._handle_signal)
+
+ def _handle_signal(self, signo, frame):
+ self.sigcaught = signo
+ self.running = False
+
+ # Allow the process to be killed again and die from natural causes
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
+
+ def _pipe_watcher(self):
+ # This will block until the write end is closed when the parent
+ # dies unexpectedly
+ self.readpipe.read()
+
+ LOG.info(_('Parent process has died unexpectedly, exiting'))
+
+ sys.exit(1)
+
+ def _child_process(self, server):
+ # Setup child signal handlers differently
+ def _sigterm(*args):
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ raise SignalExit(signal.SIGTERM)
+
+ signal.signal(signal.SIGTERM, _sigterm)
+ # Block SIGINT and let the parent send us a SIGTERM
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+ # Reopen the eventlet hub to make sure we don't share an epoll
+ # fd with parent and/or siblings, which would be bad
+ eventlet.hubs.use_hub()
+
+ # Close write to ensure only parent has it open
+ os.close(self.writepipe)
+ # Create greenthread to watch for parent to close pipe
+ eventlet.spawn(self._pipe_watcher)
+
+ # Reseed random number generator
+ random.seed()
+
+ launcher = Launcher()
+ launcher.run_server(server)
+
+ def _start_child(self, wrap):
+ if len(wrap.forktimes) > wrap.workers:
+ # Limit ourselves to one process a second (over the period of
+ # number of workers * 1 second). This will allow workers to
+ # start up quickly but ensure we don't fork off children that
+ # die instantly too quickly.
+ if time.time() - wrap.forktimes[0] < wrap.workers:
+ LOG.info(_('Forking too fast, sleeping'))
+ time.sleep(1)
+
+ wrap.forktimes.pop(0)
+
+ wrap.forktimes.append(time.time())
+
+ pid = os.fork()
+ if pid == 0:
+ # NOTE(johannes): All exceptions are caught to ensure this
+ # doesn't fallback into the loop spawning children. It would
+ # be bad for a child to spawn more children.
+ status = 0
+ try:
+ LOG.info(_('Running...'))
+ self._child_process(wrap.server)
+ LOG.info(_('Finished.'))
+ except SignalExit as exc:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[exc.signo]
+ LOG.info(_('Caught %s, exiting'), signame)
+ status = exc.code
+ except SystemExit as exc:
+ LOG.info(_('Exited with status %d.'), status)
+ status = exc.code
+ except BaseException:
+ LOG.exception(_('Unhandled exception'))
+ status = 2
+
+ os._exit(status)
+
+ LOG.info(_('Started child %d'), pid)
+
+ wrap.children.add(pid)
+ self.children[pid] = wrap
+
+ return pid
+
+ def launch_server(self, server, workers=1):
+ wrap = ServerWrapper(server, workers)
+
+ LOG.info(_('Starting %d workers'), wrap.workers)
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ def _wait_child(self):
+ try:
+ pid, status = os.wait()
+ except OSError as exc:
+ if exc.errno not in (errno.EINTR, errno.ECHILD):
+ raise
+ return None
+
+ if os.WIFSIGNALED(status):
+ sig = os.WTERMSIG(status)
+ LOG.info(_('Child %(pid)d killed by signal %(sig)d'), locals())
+ else:
+ code = os.WEXITSTATUS(status)
+ LOG.info(_('Child %(pid)d exited with status %(code)d'), locals())
+
+ if pid not in self.children:
+ LOG.warning(_('pid %d not in child list'), pid)
+ return None
+
+ wrap = self.children.pop(pid)
+ wrap.children.remove(pid)
+ return wrap
+
+ def wait(self):
+ """Loop waiting on children to die and respawning as necessary."""
+ while self.running:
+ wrap = self._wait_child()
+ if not wrap:
+ continue
+
+ while self.running and len(wrap.children) < wrap.workers:
+ self._start_child(wrap)
+
+ if self.sigcaught:
+ signame = {signal.SIGTERM: 'SIGTERM',
+ signal.SIGINT: 'SIGINT'}[self.sigcaught]
+ LOG.info(_('Caught %s, stopping children'), signame)
+
+ for pid in self.children:
+ try:
+ os.kill(pid, signal.SIGTERM)
+ except OSError as exc:
+ if exc.errno != errno.ESRCH:
+ raise
+
+ # Wait for children to die
+ if self.children:
+ LOG.info(_('Waiting on %d children to exit'), len(self.children))
+ while self.children:
+ self._wait_child()
diff --git a/quantum/wsgi.py b/quantum/wsgi.py
index 63f3ca4..ca09596 100644
--- a/quantum/wsgi.py
+++ b/quantum/wsgi.py
@@ -47,12 +47,9 @@ def run_server(application, port):
class Server(object):
"""Server class to manage multiple WSGI sockets and applications."""
- def __init__(self, name, threads=1000):
+ def __init__(self, name, port, host='0.0.0.0', backlog=128, threads=1000):
self.pool = eventlet.GreenPool(threads)
self.name = name
-
- def start(self, application, port, host='0.0.0.0', backlog=128):
- """Run a WSGI server with the given application."""
self._host = host
self._port = port
@@ -75,6 +72,9 @@ def start(self, application, port, host='0.0.0.0', backlog=128):
{'host': host, 'port': port})
sys.exit(1)
+
+ def start(self, application):
+ """Run a WSGI server with the given application."""
self._server = self.pool.spawn(self._run, application, self._socket)
@property
--
1.8.4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.