Skip to content

Instantly share code, notes, and snippets.

@danielrobbins
Created January 10, 2013 17:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielrobbins/4504129 to your computer and use it in GitHub Desktop.
Save danielrobbins/4504129 to your computer and use it in GitHub Desktop.
Fixes: ZEN-3758 Unable to run zeneventd under multiple threads in Zenoss Core, causes major RabbitMQ rawevents queue bottleneck!
Index: /branches/core/zenoss-4.2.x/Products/ZenEvents/configure.zcml
===================================================================
--- /branches/core/zenoss-4.2.x/Products/ZenEvents/configure.zcml (revision 52897)
+++ /branches/core/zenoss-4.2.x/Products/ZenEvents/configure.zcml (revision 67823)
@@ -9,5 +9,9 @@
<include package=".browser"/>
- <subscriber handler=".zeneventd.onDaemonStartRun"/>
+ <subscriber handler=".zeneventdEvents.onSigTerm"/>
+ <subscriber handler=".zeneventdEvents.onSigUsr1"/>
+ <subscriber handler=".zeneventdEvents.onBuildOptions"/>
+ <subscriber handler=".zeneventdEvents.onDaemonCreated"/>
+ <subscriber handler=".zeneventdEvents.onDaemonStartRun"/>
</configure>
Index: /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventdWorkers.py
===================================================================
--- /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventdWorkers.py (revision 67823)
+++ /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventdWorkers.py (revision 67823)
@@ -0,0 +1,103 @@
+##############################################################################
+#
+# Copyright (C) Zenoss, Inc. 2011, all rights reserved.
+#
+# This content is made available according to terms specified in
+# License.zenoss under the directory where your Zenoss product is installed.
+#
+##############################################################################
+
+
+import logging
+import signal
+import socket
+import time
+from amqplib.client_0_8.exceptions import AMQPConnectionException
+from zope.component import getUtility
+from Products.ZenEvents.zeneventd import BaseQueueConsumerTask, EventPipelineProcessor
+from Products.ZenEvents.zeneventd import QUEUE_RAW_ZEN_EVENTS
+from Products.ZenMessaging.queuemessaging.eventlet import BasePubSubMessageTask
+from Products.ZenUtils.ZCmdBase import ZCmdBase
+from zenoss.protocols.interfaces import IAMQPConnectionInfo, IQueueSchema
+from zenoss.protocols.jsonformat import to_dict
+from zenoss.protocols.eventlet.amqp import Publishable, getProtobufPubSub
+from Products.ZenCollector.utils.workers import workersBuildOptions
+
+log = logging.getLogger("zen.eventd")
+
+class EventletQueueConsumerTask(BaseQueueConsumerTask, BasePubSubMessageTask):
+
+ def __init__(self, processor):
+ BaseQueueConsumerTask.__init__(self, processor)
+
+ def processMessage(self, message):
+ """
+ Handles a queue message, can call "acknowledge" on the Queue Consumer
+ class when it is done with the message
+ """
+ zepRawEvent = self.processor.processMessage(message)
+
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug("Publishing event: %s", to_dict(zepRawEvent))
+
+ yield Publishable(zepRawEvent, exchange=self._dest_exchange,
+ routingKey=self._routing_key(zepRawEvent))
+
+class EventDEventletWorker(ZCmdBase):
+
+ mname = 'ZenEventD' # For logging
+
+ def __init__(self):
+ super(EventDEventletWorker, self).__init__()
+ self._amqpConnectionInfo = getUtility(IAMQPConnectionInfo)
+ self._queueSchema = getUtility(IQueueSchema)
+
+ def run(self):
+ self._shutdown = False
+ signal.signal(signal.SIGTERM, self._sigterm)
+ task = EventletQueueConsumerTask(EventPipelineProcessor(self.dmd))
+ self._listen(task)
+
+ def shutdown(self):
+ self._shutdown = True
+ if self._pubsub:
+ self._pubsub.shutdown()
+ self._pubsub = None
+
+ def buildOptions(self):
+ super(EventDEventletWorker, self).buildOptions()
+ # don't comment out the workers option in zeneventd.conf (ZEN-2769)
+ workersBuildOptions(self.parser)
+
+ def _sigterm(self, signum=None, frame=None):
+ log.debug("worker sigterm...")
+ self.shutdown()
+
+ def _listen(self, task, retry_wait=30):
+ self._pubsub = None
+ keepTrying = True
+ sleep = 0
+ while keepTrying and not self._shutdown:
+ try:
+ if sleep:
+ log.info("Waiting %s seconds to reconnect..." % sleep)
+ time.sleep(sleep)
+ sleep = min(retry_wait, sleep * 2)
+ else:
+ sleep = .1
+ log.info("Connecting to RabbitMQ...")
+ self._pubsub = getProtobufPubSub(self._amqpConnectionInfo, self._queueSchema, QUEUE_RAW_ZEN_EVENTS)
+ self._pubsub.registerHandler('$Event', task)
+ self._pubsub.registerExchange('$ZepZenEvents')
+ #reset sleep time
+ sleep=0
+ self._pubsub.run()
+ except (socket.error, AMQPConnectionException) as e:
+ log.warn("RabbitMQ Connection error %s" % e)
+ except KeyboardInterrupt:
+ keepTrying = False
+ finally:
+ if self._pubsub:
+ self._pubsub.shutdown()
+ self._pubsub = None
+
Index: /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventd.py
===================================================================
--- /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventd.py (revision 61801)
+++ /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventd.py (revision 67823)
@@ -17,4 +17,5 @@
import Globals
from zope.component import getUtility, adapter
+
from zope.interface import implements
from zope.component.event import objectEventNotify
@@ -265,10 +266,4 @@
-@adapter(ZenEventD, DaemonStartRunEvent)
-def onDaemonStartRun(daemon, event):
- """
- Start up an EventDWorker.
- """
- EventDTwistedWorker(daemon.dmd).run()
if __name__ == '__main__':
Index: /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventdEvents.py
===================================================================
--- /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventdEvents.py (revision 67823)
+++ /branches/core/zenoss-4.2.x/Products/ZenEvents/zeneventdEvents.py (revision 67823)
@@ -0,0 +1,58 @@
+##############################################################################
+#
+# Copyright (C) Zenoss, Inc. 2011, all rights reserved.
+#
+# This content is made available according to terms specified in
+# License.zenoss under the directory where your Zenoss product is installed.
+#
+##############################################################################
+
+
+import pkg_resources
+from zenoss.protocols.eventlet.amqp import register_eventlet
+from twisted.internet import reactor
+from zope.component import adapter, getGlobalSiteManager
+from Products.ZenEvents.zeneventd import ZenEventD
+from Products.ZenEvents.daemonlifecycle import DaemonCreatedEvent, DaemonStartRunEvent
+from Products.ZenEvents.daemonlifecycle import SigTermEvent, SigUsr1Event, BuildOptionsEvent
+from Products.ZenCollector.utils.workers import ProcessWorkers, workersBuildOptions, exec_worker
+
+@adapter(ZenEventD, SigTermEvent)
+def onSigTerm(daemon, event):
+ if daemon.options.daemon:
+ daemon._workers.shutdown()
+
+@adapter(ZenEventD, SigUsr1Event)
+def onSigUsr1(daemon, event):
+ if daemon.options.daemon:
+ daemon._workers.sendSignal(event.signum)
+
+@adapter(ZenEventD, BuildOptionsEvent)
+def onBuildOptions(daemon, event):
+ workersBuildOptions(daemon.parser, default=2)
+
+@adapter(ZenEventD, DaemonCreatedEvent)
+def onDaemonCreated(daemon, event):
+ """
+ Called at the end of zeneventd's constructor.
+ """
+ register_eventlet()
+ if daemon.options.daemon:
+ daemon._workers = ProcessWorkers(daemon.options.workers, exec_worker, "Event worker")
+
+@adapter(ZenEventD, DaemonStartRunEvent)
+def onDaemonStartRun(daemon, event):
+ """
+ Called when the daemon is ready to begin processing. This handler replaces the one
+ defined in zeneventd.py, because onDaemonCreated (above) removes it
+ """
+ from .zeneventdWorkers import EventDEventletWorker
+ # Free up unnecessary database resources in parent zeneventd process
+ if daemon.options.daemon:
+ daemon.closedb()
+ daemon.closeAll()
+ daemon._workers.startWorkers()
+ reactor.run()
+ else:
+ worker = EventDEventletWorker()
+ worker.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment