Skip to content

Instantly share code, notes, and snippets.

@robcowie
Created January 20, 2011 17:16
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robcowie/788211 to your computer and use it in GitHub Desktop.
Save robcowie/788211 to your computer and use it in GitHub Desktop.
Supervisor --> AMQP event listener
# -*- coding: utf-8 -*-
"""
Run as a supervisor event listener process. See http://supervisord.org/events.html for more info.
An example eventlistener config block looks like:
[eventlistener:myeventhandler]
command=python /path/to/eventriloquist.py
events=EVENT
"""
import argparse
# import logging
import os
import sys
from carrot.connection import BrokerConnection, AMQPConnection
from carrot.messaging import Publisher
from supervisor import childutils
from supervisor.states import ProcessStates
# logging.basicConfig(filename='output.log', level=logging.DEBUG)
# logger = logging.getLogger(__name__)
class SupervisorEvents(object):
event_hierarchy = {
'EVENT': [
'PROCESS_STATE',
'REMOTE_COMMUNICATIONS',
'PROCESS_LOG',
'PROCESS_COMMUNICATION',
'SUPERVISOR_STATE_CHANGE',
'TICK'],
'PROCESS_STATE': [
'PROCESS_STATE_STARTING', 'PROCESS_STATE_RUNNING',
'PROCESS_STATE_BACKOFF', 'PROCESS_STATE_STOPPING',
'PROCESS_STATE_EXITED', 'PROCESS_STATE_STOPPED',
'PROCESS_STATE_FATAL', 'PROCESS_STATE_UNKNOWN'],
'REMOTE_COMMUNICATIONS': [],
'PROCESS_LOG': [
'PROCESS_LOG_STDOUT',
'PROCESS_LOG_STDERR'],
'PROCESS_COMMUNICATION': [
'PROCESS_COMMUNICATION_STDOUT',
'PROCESS_COMMUNICATION_STDERR'],
'SUPERVISOR_STATE_CHANGE': [
'SUPERVISOR_STATE_CHANGE_RUNNING',
'SUPERVISOR_STATE_CHANGE_STOPPING'],
'TICK': [
'TICK_5',
'TICK_60',
'TICK_3600']
}
def hierarchy (self, event):
path = self._find_path(self.event_hierarchy, 'EVENT', event)
return path or []
def _find_path(self, graph, start, end, path=[]):
path = path + [start]
if start == end:
return path
if not graph.has_key(start):
return None
for node in graph[start]:
if node not in path:
newpath = self._find_path(graph, node, end, path)
if newpath: return newpath
return None
class AMQPNotifier(object):
""""""
def __init__(self, rpc, amqpconn):
self.rpc = rpc
self.stdin = sys.stdin
self.stdout = sys.stdout
self.stderr = sys.stderr
self.amqpconn = amqpconn
self.supervisorevents = SupervisorEvents()
def listProcesses(self, state=None):
return [x for x in self.rpc.supervisor.getAllProcessInfo()
if (state is None or x['state'] == state)]
def runforever(self):
while 1:
headers, payload = childutils.listener.wait(self.stdin, self.stdout)
if headers['eventname'] == 'TICK_5':
childutils.listener.ok(self.stdout)
continue
msg = headers.copy()
if not payload:
msg['payload'] = None
else:
msg['payload'] = dict([ x.split(':') for x in payload.split() ])
try:
publisher = Publisher(
self.amqpconn,
exchange='amq.topic',
exchange_type='topic',
delivery_mode='transient')
key = '.'.join(self.supervisorevents.hierarchy(headers['eventname']))
key = 'supervisor.%s' % key.lower()
publisher.send(msg, routing_key=key)
publisher.close()
except Exception, e:
logger.debug(e)
childutils.listener.ok(self.stdout)
def main():
parser = argparse.ArgumentParser(description='', conflict_handler='resolve')
parser.add_argument('--version', action='version', version='0.0.1')
args = parser.parse_args()
try:
args.rpc = childutils.getRPCInterface(os.environ)
except KeyError, why:
if why[0] != 'SUPERVISOR_SERVER_URL':
raise
parser.error('Must be run as a supervisor event listener')
try:
args.amqpconn = AMQPConnection(
hostname="vsuse", port=5672,
userid="guest", password="guest",
virtual_host="/")
except Exception, e:
raise
AMQPNotifier(args.rpc, args.amqpconn).runforever()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment