Skip to content

Instantly share code, notes, and snippets.

@lukemarsden
Created June 25, 2011 20:16
Show Gist options
  • Save lukemarsden/1046854 to your computer and use it in GitHub Desktop.
Save lukemarsden/1046854 to your computer and use it in GitHub Desktop.
MuninClientProtocol
import re
import time
from twisted.internet import reactor
from twisted.internet import protocol
from twisted.internet import defer
class MuninClientProtocol(protocol.Protocol):
def __init__(self, out_channel, server):
print "Constructing protocol"
self.buf = ''
self.on_dataReceived = defer.Deferred()
self.start_time = time.time()
# out_channel is an object with two methods:
#
# send_result(probe, data):
# - a function to call with the result of each probe
# finish(reason):
# - a function to call when we're finished, reason can be a
# Failure object or string
#
# server is just the name of the server we are querying
self.out_channel = out_channel
self.server = server
# got_probes is used for the meta-measurement munin.success
# if we got as far as receiving the list of probes,
# we consider this box UP (for use in the UI).
self.got_probes = False
# Timeout in 30 seconds if we haven't got a complete response
self.timeout_dfr = reactor.callLater(30, self._end_connection, 'Timeout')
def connectionMade(self):
self.collect_data()
protocol.Protocol.connectionMade(self)
def connectionFailed(self, reason):
self._end_connection(reason)
protocol.Protocol.connectionFailed(self, reason)
def connectionLost(self, reason):
self._end_connection(reason)
protocol.Protocol.connectionLost(self, reason)
def _end_connection(self, reason):
print "In end_connection %s" % self.server
if self.got_probes:
self._process('munin.success', 'value.value 1')
self._process('munin.fetch_time', 'value.value ' + str(time.time() - self.start_time))
else:
self._process('munin.success', 'value.value 0')
self.out_channel.finish(reason)
if self.timeout_dfr and not self.timeout_dfr.called:
self.timeout_dfr.cancel()
self.timeout_dfr = None # garbage-collect this
@defer.inlineCallbacks
def recv(self):
"Receive a packet"
data = yield self.on_dataReceived
self.on_dataReceived = defer.Deferred() # reset deferred
defer.returnValue(data)
def dataReceived(self, data):
#print "Got %s\n" % data
self.on_dataReceived.callback(data)
@defer.inlineCallbacks
def collect_data(self):
"Iterate over the server's probes, fetching each one"
yield self.recv() # throw away server's greeting (*)
self.transport.write("list\n")
probes = yield self.recv() # (*) assuming these come in a single packet
self.got_probes = True
print probes
for probe in probes.split(' '):
#print "Writing fetch %s\n" % probe
self.transport.write("fetch %s\n" % probe)
data_buf = ''
while not (data_buf==".\n" or data_buf.endswith("\n.\n")):
result = yield self.recv()
data_buf += result
self._process(probe, data_buf)
self.transport.loseConnection()
def _process(self, probe, data_buf):
"Process a result from a single probe"
result = []
for line in data_buf.split('\n'):
# Some plugins send a retarded double-space separator, so we use a regex:
matches = re.match("([^ ]+)[ ]+([^ ]+)", line)
if matches:
lhs, rhs = matches.groups()
result.append((lhs.replace('.value',''),rhs.replace('\n','')))
# Push the structured data out the output channel
self.out_channel.send_result(self.server, probe, result)
class MuninClientFactory(protocol.ClientFactory):
def __init__(self, out_channel, server):
self.server = server
self.out_channel = out_channel
def clientConnectionFailed(self, connector, reason):
# Record that we were unable to connect to munin
self.out_channel.send_result(self.server, 'munin.success', [('value', 0)]) # Record that this machine was down at this moment
self.out_channel.finish('Connection failed')
protocol.ClientFactory.clientConnectionFailed(self, connector, reason)
def clientConnectionLost(self, connector, reason):
# This is legitimate, don't fail the connection on a clean connection closed
protocol.ClientFactory.clientConnectionLost(self, connector, reason)
protocol = MuninClientProtocol
def buildProtocol(self, addr):
p = self.protocol(self.out_channel, self.server)
p.factory = self
return p
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment