Created
January 20, 2011 17:16
-
-
Save robcowie/788211 to your computer and use it in GitHub Desktop.
Supervisor --> AMQP event listener
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Run as a supervisor event listener process. See http://supervisord.org/events.html for more info. | |
An example eventlistener config block looks like: | |
[eventlistener:myeventhandler] | |
command=python /path/to/eventriloquist.py | |
events=EVENT | |
""" | |
import argparse | |
# import logging | |
import os | |
import sys | |
from carrot.connection import BrokerConnection, AMQPConnection | |
from carrot.messaging import Publisher | |
from supervisor import childutils | |
from supervisor.states import ProcessStates | |
# logging.basicConfig(filename='output.log', level=logging.DEBUG) | |
# logger = logging.getLogger(__name__) | |
class SupervisorEvents(object): | |
event_hierarchy = { | |
'EVENT': [ | |
'PROCESS_STATE', | |
'REMOTE_COMMUNICATIONS', | |
'PROCESS_LOG', | |
'PROCESS_COMMUNICATION', | |
'SUPERVISOR_STATE_CHANGE', | |
'TICK'], | |
'PROCESS_STATE': [ | |
'PROCESS_STATE_STARTING', 'PROCESS_STATE_RUNNING', | |
'PROCESS_STATE_BACKOFF', 'PROCESS_STATE_STOPPING', | |
'PROCESS_STATE_EXITED', 'PROCESS_STATE_STOPPED', | |
'PROCESS_STATE_FATAL', 'PROCESS_STATE_UNKNOWN'], | |
'REMOTE_COMMUNICATIONS': [], | |
'PROCESS_LOG': [ | |
'PROCESS_LOG_STDOUT', | |
'PROCESS_LOG_STDERR'], | |
'PROCESS_COMMUNICATION': [ | |
'PROCESS_COMMUNICATION_STDOUT', | |
'PROCESS_COMMUNICATION_STDERR'], | |
'SUPERVISOR_STATE_CHANGE': [ | |
'SUPERVISOR_STATE_CHANGE_RUNNING', | |
'SUPERVISOR_STATE_CHANGE_STOPPING'], | |
'TICK': [ | |
'TICK_5', | |
'TICK_60', | |
'TICK_3600'] | |
} | |
def hierarchy (self, event): | |
path = self._find_path(self.event_hierarchy, 'EVENT', event) | |
return path or [] | |
def _find_path(self, graph, start, end, path=[]): | |
path = path + [start] | |
if start == end: | |
return path | |
if not graph.has_key(start): | |
return None | |
for node in graph[start]: | |
if node not in path: | |
newpath = self._find_path(graph, node, end, path) | |
if newpath: return newpath | |
return None | |
class AMQPNotifier(object): | |
"""""" | |
def __init__(self, rpc, amqpconn): | |
self.rpc = rpc | |
self.stdin = sys.stdin | |
self.stdout = sys.stdout | |
self.stderr = sys.stderr | |
self.amqpconn = amqpconn | |
self.supervisorevents = SupervisorEvents() | |
def listProcesses(self, state=None): | |
return [x for x in self.rpc.supervisor.getAllProcessInfo() | |
if (state is None or x['state'] == state)] | |
def runforever(self): | |
while 1: | |
headers, payload = childutils.listener.wait(self.stdin, self.stdout) | |
if headers['eventname'] == 'TICK_5': | |
childutils.listener.ok(self.stdout) | |
continue | |
msg = headers.copy() | |
if not payload: | |
msg['payload'] = None | |
else: | |
msg['payload'] = dict([ x.split(':') for x in payload.split() ]) | |
try: | |
publisher = Publisher( | |
self.amqpconn, | |
exchange='amq.topic', | |
exchange_type='topic', | |
delivery_mode='transient') | |
key = '.'.join(self.supervisorevents.hierarchy(headers['eventname'])) | |
key = 'supervisor.%s' % key.lower() | |
publisher.send(msg, routing_key=key) | |
publisher.close() | |
except Exception, e: | |
logger.debug(e) | |
childutils.listener.ok(self.stdout) | |
def main(): | |
parser = argparse.ArgumentParser(description='', conflict_handler='resolve') | |
parser.add_argument('--version', action='version', version='0.0.1') | |
args = parser.parse_args() | |
try: | |
args.rpc = childutils.getRPCInterface(os.environ) | |
except KeyError, why: | |
if why[0] != 'SUPERVISOR_SERVER_URL': | |
raise | |
parser.error('Must be run as a supervisor event listener') | |
try: | |
args.amqpconn = AMQPConnection( | |
hostname="vsuse", port=5672, | |
userid="guest", password="guest", | |
virtual_host="/") | |
except Exception, e: | |
raise | |
AMQPNotifier(args.rpc, args.amqpconn).runforever() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment