Created
February 6, 2015 19:16
-
-
Save dannon/9ef2e41748eb12d6b1bf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff -r 32e4527bab2d lib/galaxy/app.py | |
--- a/lib/galaxy/app.py Fri Feb 06 09:00:23 2015 -0500 | |
+++ b/lib/galaxy/app.py Fri Feb 06 14:16:50 2015 -0500 | |
@@ -18,7 +18,6 @@ from galaxy.tools.data_manager.manager i | |
from galaxy.jobs import metrics as job_metrics | |
from galaxy.web.base import pluginframework | |
from galaxy.web.proxy import ProxyManager | |
-from galaxy.queue_worker import GalaxyQueueWorker | |
from tool_shed.galaxy_install import update_repository_manager | |
import logging | |
@@ -34,6 +33,8 @@ class UniverseApplication( object, confi | |
# Read config file and check for errors | |
self.config = config.Configuration( **kwargs ) | |
self.config.check() | |
+ import kombu | |
+ self.amqp_internal_connection_obj = kombu.Connection(self.config.amqp_internal_connection) | |
config.configure_logging( self.config ) | |
self.configure_fluent_log() | |
@@ -150,11 +151,6 @@ class UniverseApplication( object, confi | |
self.workflow_scheduling_manager = scheduling_manager.WorkflowSchedulingManager( self ) | |
self.model.engine.dispose() | |
- self.control_worker = GalaxyQueueWorker(self, | |
- galaxy.queues.control_queue_from_config(self.config), | |
- galaxy.queue_worker.control_message_to_task) | |
- self.control_worker.daemon = True | |
- self.control_worker.start() | |
def shutdown( self ): | |
self.workflow_scheduling_manager.shutdown() | |
diff -r 32e4527bab2d lib/galaxy/queue_worker.py | |
--- a/lib/galaxy/queue_worker.py Fri Feb 06 09:00:23 2015 -0500 | |
+++ b/lib/galaxy/queue_worker.py Fri Feb 06 14:16:50 2015 -0500 | |
@@ -30,10 +30,13 @@ class GalaxyQueueWorker(ConsumerMixin, t | |
handler, will have one of these used for dispatching so called 'control' | |
tasks. | |
""" | |
- def __init__(self, app, queue, task_mapping): | |
+ def __init__(self, app, queue, task_mapping, connection=None): | |
super(GalaxyQueueWorker, self).__init__() | |
log.info("Initalizing Galaxy Queue Worker on %s" % app.config.amqp_internal_connection) | |
- self.connection = Connection(app.config.amqp_internal_connection) | |
+ if connection: | |
+ self.connection = connection | |
+ else: | |
+ self.connection = Connection(app.config.amqp_internal_connection) | |
self.app = app | |
# Eventually we may want different workers w/ their own queues and task | |
# mappings. Right now, there's only the one. | |
diff -r 32e4527bab2d lib/galaxy/webapps/galaxy/buildapp.py | |
--- a/lib/galaxy/webapps/galaxy/buildapp.py Fri Feb 06 09:00:23 2015 -0500 | |
+++ b/lib/galaxy/webapps/galaxy/buildapp.py Fri Feb 06 14:16:50 2015 -0500 | |
@@ -18,6 +18,7 @@ import galaxy.datatypes.registry | |
import galaxy.web.framework | |
import galaxy.web.framework.webapp | |
from galaxy import util | |
+from galaxy.queue_worker import GalaxyQueueWorker | |
from galaxy.util import asbool | |
from galaxy.util.properties import load_app_properties | |
@@ -102,6 +103,12 @@ def app_factory( global_conf, **kwargs ) | |
except: | |
pass | |
+ app.control_worker = GalaxyQueueWorker(app, | |
+ galaxy.queues.control_queue_from_config(app.config), | |
+ galaxy.queue_worker.control_message_to_task, | |
+ connection=app.amqp_internal_connection_obj) | |
+ app.control_worker.daemon = True | |
+ app.control_worker.start() | |
# Return | |
return webapp | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment