Skip to content

Instantly share code, notes, and snippets.

@mnot
Created September 6, 2010 09:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mnot/566863 to your computer and use it in GitHub Desktop.
Save mnot/566863 to your computer and use it in GitHub Desktop.
Chatterbox - System monitoring through social networking
#!/usr/bin/env python
"""
Chatterbox - System monitoring through Social Networking.
Chatterbox uses Twitter, IM and eventually other "social" channels to
send you information about your systems, so that you can keep in touch
with how they're performing without lots of messages in your e-mail inbox
or on your pager.
Currently, it only looks for interesting conditions in Squid logs. It's also not really... done.
"""
__author__ = "Mark Nottingham <mnot@mnot.net>"
__copyright__ = """\
Copyright (c) 2010 Mark Nottingham
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import ConfigParser
import cPickle as pickle
import fcntl
import gzip
import logging
import os
import signal
import sys
import time
from collections import defaultdict
from functools import partial
from logging import StreamHandler
from urlparse import urlsplit
# try to get the epollreactor
try:
from twisted.internet import epollreactor
epollreactor.install()
except ImportError:
pass
from twisted.internet import reactor, stdio
from twisted.protocols.basic import LineReceiver
from twisted.internet.protocol import connectionDone
from twisted.words.protocols.jabber import client, jid, xmlstream
from twisted.words.xish import domish
from twisted.python import log as twisted_log
from twittytwister import twitter # http://github.com/dustin/twitty-twister
# requires http://github.com/leah/python-oauth
############################################################################
class Chatterbox:
"The Chatterbox coordinator."
def __init__(self, re_actor, config, log):
self.reactor = re_actor
self.config = config
self.log = log
self.state = AttrDict() # things that get persisted between invocations
self.shutting_down = False
def start(self):
"Start chatterbox."
self.log.info("start")
self.monitors = [SquidLogWatcher(self)]
self.channels = Channels(TweetBucket(self), JabberClient(self))
# temp
self.load_state() # call AFTER anything that might set default in state
self.channels.online(True)
self.reactor.run(installSignalHandlers=False)
def shutdown(self):
"Stop chatterbox."
# TODO: is shutting_down necessary? Check twisted semantics...
if self.shutting_down or not self.reactor.running:
return
self.shutting_down = True
self.log.info("stop")
self.channels.online(False)
self.save_state()
# FIXME: yes, this is a race condition.
self.reactor.callLater(3, self.reactor.stop)
def load_state(self):
"Load any persisted state."
dbfile = self.config.get("main", "statefile")
try:
db_lock = open(dbfile, 'a')
fcntl.flock(db_lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError, why:
# can't get a lock
self.log.info("State file is locked; retrying read...")
self.reactor.callLater(1, self.load_state)
return
start_time = time.time()
try:
db = gzip.open(dbfile, 'rb')
# taste the file to see if it's compressed
try:
db.read(1)
db.seek(0)
except (IOError, EOFError):
# open non-gzipped file
db = open(dbfile, 'rb')
try:
state = pickle.load(db)
except (ValueError, pickle.PickleError), why:
self.log.error("State is corrupt! (%s)" % why)
return
db.close()
except (IOError, EOFError), why:
self.log.warning("State read problem. (%s)" % why)
return
finally:
db_lock.close()
self.state = state
self.log.info("State loaded in %3.3f seconds." % (time.time() - start_time))
def save_state(self):
"Persist state."
dbfile = self.config.get("main", "statefile")
try:
db_lock = open(dbfile, 'a')
fcntl.flock(db_lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError, why:
# can't get a lock
self.log.info("State file is locked; retrying write...")
self.reactor.callLater(1, self.save_state)
return
start_time = time.time()
try:
if self.config.getboolean('main', 'use-gzip'):
db = gzip.open(dbfile, 'wb', 6)
else:
db = open(dbfile, 'wb')
try:
pickle.dump(self.state, db, pickle.HIGHEST_PROTOCOL)
except pickle.PickleError, why:
self.log.error("Can't write state! (%s)" % why)
db.close()
except IOError, why:
self.log.warning("Problems writing state! (%s)" % why)
finally:
db_lock.close()
self.log.info("State saved in %3.3f seconds." % (time.time() - start_time))
def load_config_list(self, name, section, key):
value = self.config.get(section, key).strip().split(None)
self.log.info("Loaded %s %s: %s" % (section, name, ", ".join(value)))
return value
############################################################################
class Channels:
"""
Holder for open channels.
"""
def __init__(self, *channels):
self.channels = channels
def __getattr__(self, attr):
def apply_to_channels(*args, **kw):
for channel in self.channels:
getattr(channel, "channel_" + attr, self.noop)(*args, **kw)
return apply_to_channels
def noop(self, *args, **kw):
sys.stderr.write("Chatterbox - noop!\n")
class Channel:
"""
Base class for channels.
"""
def channel_online(self, active):
"Notify of startup and shutdown."
pass
def channel_say(self, msg):
"Tell people about something."
pass
def channel_alert(self, msg):
"Alert admin to something bad."
pass
class JabberClient(Channel):
"""
Jabber client; not working very well at the moment, so disabled.
"""
def __init__(self, mgr):
self.mgr = mgr
self.xmlstream = None
username = mgr.config.get("jabber", "username").strip()
password = mgr.config.get("jabber", "password").strip()
server = mgr.config.get("jabber", "server").strip()
self.admins = self.mgr.load_config_list("admins", "jabber", "admins")
self.online_admins = set()
self.me = jid.JID(username + "/Chatterbox")
self.factory = client.basicClientFactory(self.me, password)
self.factory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.authd)
self.factory.addBootstrap(client.BasicAuthenticator.AUTH_FAILED_EVENT, self.authfailedEvent)
self.mgr.reactor.connectTCP(server, 5222, self.factory)
mgr.log.debug("Jabber client started.")
def authd(self, xmlstream):
self.mgr.log.info("Jabber authenticated.")
self.xmlstream = xmlstream
presence = domish.Element(('jabber:client', 'presence'))
presence.addElement('status').addContent('Online')
xmlstream.send(presence)
xmlstream.addObserver('/message', self.gotMessage)
xmlstream.addObserver('/presence', self.gotPresence)
self.sendKeepAlive()
def authfailedEvent(self, xmlstream):
self.mgr.log.error("Jabber authentication failed.")
def connectionLost(self, reason):
self.mgr.log.info("Jabber disconnected.")
### TODO: reconnect
def gotMessage(self, message):
body = None
for e in message.elements():
if e.name == "body":
body = unicode(e.__str__()).strip().lower()
self.mgr.log.debug("Jabber receive: %s" % body)
def sendMessage(self, to, msg):
if self.xmlstream:
message = domish.Element(('jabber:client','message'))
message["to"] = jid.JID(to).full()
message["from"] = self.me.full()
message["type"] = "chat"
message.addElement("body", "jabber:client", msg)
self.xmlstream.send(message)
else:
self.mgr.reactor.callLater(3, self.sendMessage, to, msg)
def gotPresence(self, element):
full_addr = element['from']
addr = full_addr.split("/", 1)[0]
if addr in self.admins:
self.mgr.log.debug("Jabber admin %s online." % addr)
self.online_admins.add(full_addr)
self.sendMessage(full_addr, 'hi.')
def sendKeepAlive(self):
"Nudge the connection."
self.xmlstream.send(" ")
self.mgr.reactor.callLater(30, self.sendKeepAlive)
def channel_online(self, active):
if active:
self.channel_say("Hi!")
else:
self.channel_say("Bye for now.")
def channel_say(self, msg):
for admin in self.online_admins:
self.sendMessage(admin, msg)
class TweetBucket(Channel, twitter.Twitter):
"""
Twitter client.
"""
def __init__(self, mgr):
self.mgr = mgr
username = mgr.config.get("twitter", "username").strip()
password = mgr.config.get("twitter", "password").strip()
twitter.Twitter.__init__(self, username, password)
self.admins = self.mgr.load_config_list("admins", "twitter", "admins")
mgr.log.debug("Twitter client started.")
def channel_say(self, msg):
"Tweet a message."
d = self.update(msg)
d.addCallback(self.good_post)
d.addErrback(self.bad_post)
# def channel_online(self, active):
# if not active:
# self.channel_say("Going down...")
def channel_alert(self, msg):
self.channel_say("%s %s, can you look into it?" % (msg, " ".join(self.admins)))
def good_post(self, post_id):
"Tweet has succeeded."
self.mgr.log.info("Posted tweet #%s." % post_id)
def bad_post(self, problem):
"Problem with the tweet."
self.mgr.log.error("Problem posting tweet: %s" % problem)
############################################################################
class SquidLogWatcher(LineReceiver):
"Handles the Squid log deamon protocol in STDIN."
delimiter = '\n'
def __init__(self, mgr):
self.mgr = mgr
self.squid_stats = SquidStats(mgr)
stdio.StandardIO(self)
mgr.log.debug("Squid log listener started.")
def lineReceived(self, line):
"""
Process a log line.
"""
try:
code = line[0]
except:
return
if code is 'L': # Log
line = line[1:].rstrip()
self.mgr.log.debug("log: %s" % line)
try:
self.squid_stats.add(line)
except ValueError:
self.mgr.log.error("Misformatted Squid log line received!")
elif code is 'R': # Rotate
self.mgr.log.info("Rotating logs...")
for hdlr in self.log.handlers:
try:
hdlr.doRollover()
except AttributeError:
pass
else:
pass
def connectionLost(self, reason=connectionDone):
self.mgr.shutdown()
class SquidStats:
"""
Analyses incoming Squid log lines, keeps stats on them and
alerts the user when interesting things happen.
Squid config:
logfile_daemon /path/to/chatterbox.py
logformat chatterbox %ts %tr %>a %Ss %Hs %<st %rm %ru %mt %{Referer}>h
access_log daemon:/path/to/config.txt chatterbox
"""
def __init__(self, mgr, max_buckets=120, bucket_width=30):
self.mgr = mgr
self.state = mgr.state
self.max_buckets = max_buckets # how many buckets to keep
self.bucket_width = bucket_width # how many seconds wide a bucket is
self.view_span = "%s minutes" % int(max_buckets * bucket_width / 60.0)
self.buckets = [{}]
self.state.seen_referers=set()
self.site_names = self.mgr.load_config_list("site names", "main", "site-names")
self.ignore_sites = self.mgr.load_config_list("sites to ignore", "main", "ignore-sites")
self.mgr.reactor.callLater(bucket_width, self.gc)
mgr.log.debug("Squid stats monitor installed.")
def add(self, line):
"""Add a log line to the stats collection. """
field_names = ['ts', 'latency', 'client_ip', 'squid_code', 'status',
'bytes', 'method', 'url', 'media_type', 'referer']
entry = dict(zip(field_names, line.split(None, 9)))
self.buckets[-1]['hits'] = self.buckets[-1].get('hits', 0) + 1
self.examine(entry)
def examine(self, entry):
"""
Examine a new log entry for interesting things; usually, to extract
the bits we need to check later.
Will call each exampine_* method with the entry as its argument, expecting
it to return data to stash away in the current bucket.
"""
for name, method in self.get_methods('examine_'):
result = method(entry)
if result:
if not self.buckets[-1].has_key(name):
self.buckets[-1][name] = []
self.buckets[-1][name].append(result)
def check(self):
"""
Check the stats collection for any interesting conditions.
Will call each check_* method with all buckted entries.
"""
for name, method in self.get_methods('check_'):
entries = sum([b.get(name, []) for b in self.buckets], [])
method(entry)
def examine_referers(self, entry):
"Look for new, high-traffic referers."
if entry['referer'] in ['-', '']: return
if entry['status'] in ['403', '404', '410', '301', '302', '303', '307']: return
referer_site = urlsplit(entry['referer']).hostname
if (referer_site in self.site_names) or (referer_site in self.ignore_sites): return
referer_pair = (entry['referer'], entry['url'])
# TODO: canonicalise
return referer_pair
def check_referers(self, stashed_entries):
for referer_pair, count in stashed_entries:
if count > 2:
self.mgr.channels.say("Hey, %s is giving %s some love." % referer_pair)
# if referer_pair not in self.state.seen_referers:
# self.state.seen_referers.add(referer_pair)
def check_status(self, stashed_entries):
"Look for unusual status codes."
server_err_urls = [e['url'] for e in entries if e['status'] in ['500']]
if len(entries) > 12 and len(entries) * .25 > len(server_err_urls):
self.mgr.channels.say("Uh, oh; %s of the last %s hits were 5xx errors!" % (
len(server_err_urls), len(entries)))
else:
for url, count in self.count(server_err_urls):
if count > 2:
self.mgr.channels.say("%s seems to be getting some 5xx errors recently." % url)
def gc(self):
"""Garbage collect the stats collection. """
# Has it been quiet?
if len(self.buckets) >= self.max_buckets and not(sum([b.get('hits', 0) for b in self.buckets])):
self.mgr.channels.alert(
"Hmm, not seeing any traffic in a while.")
self.buckets.append({})
if len(self.buckets) > self.max_buckets:
self.buckets.pop(0)
self.mgr.reactor.callLater(self.bucket_width, self.gc)
def get_methods(self, prefix):
return [(k.split(prefix, 1)[1], partial(v, self)) for (k,v) in \
self.__class__.__dict__.items() if k.startswith(prefix)]
@staticmethod
def count(lyst):
"Given a list of items, return a list of (item, count)"
c = defaultdict(int)
for i in lyst:
c[i] += 1
return c.items()
##############################################################################
def main(configfile):
# load config
try:
config = ConfigParser.SafeConfigParser(
{
'log-level': 'INFO',
'use-gzip': 'True',
'site-names': '',
'ignore-sites': '',
'admins': '',
}
)
config.add_section('main')
config.read(configfile)
log_level = config.get("main", "log-level").strip().upper()
except ConfigParser.Error, why:
error("Configuration file: %s\n" % why)
# start logging
log = logging.getLogger()
try:
hdlr = StreamHandler(sys.stderr)
except IOError, why:
error("Can't open log file (%s)" % why)
formatter = logging.Formatter('%(asctime)s| Chatterbox %(levelname)s: %(message)s', '%Y/%m/%d %H:%M:%S')
hdlr.setFormatter(formatter)
log.addHandler(hdlr)
log.setLevel(logging._levelNames.get(log_level, logging.INFO))
observer = twisted_log.PythonLoggingObserver()
observer.start()
# run
try:
mgr = Chatterbox(reactor, config, log)
# we ignore SIGTERM because Squid will close the log FH, which gives
# us a much cleaner signal that we're to shut down.
signal.signal(signal.SIGTERM, signal.SIG_IGN)
mgr.start()
except ConfigParser.Error, why:
error("Configuration file: %s" % why)
except Exception, why:
if log_level != 'DEBUG':
error(why)
else:
raise
except:
if log_level != 'DEBUG':
error("Unknown error.")
else:
raise
# clean up logging
hdlr.flush()
hdlr.close()
logging.shutdown()
def error(msg):
"Something really bad has happened. Should only be used during startup."
sys.stderr.write("CHAT FATAL: %s\n" % msg)
sys.exit(1)
class AttrDict(dict):
def __init__(self, *args, **kwargs):
dict.__init__(self, *args, **kwargs)
self.__dict__ = self
if __name__ == '__main__':
try:
conf = sys.argv[1]
except IndexError:
sys.stderr.write("USAGE: %s config_filename\n" % sys.argv[0])
sys.exit(1)
if not os.path.exists(conf):
error("Can't find config file %s." % conf)
main(conf)
[main]
log-level = DEBUG
statefile = /tmp/foo
site-names = www.example.com
ignore-sites = www.google.com www.google.it www.google.de www.google.com.au www.google.fr
www.google.co.in www.google.com.tr www.google.at www.google.be www.google.ae www.google.ca
www.google.ch www.google.co www.google.cz www.google.dk www.google.ie www.google.nl
www.google.pl www.google.pt
bing.com bing.com.au bing.co.uk
translate.googleusercontent.com webcache.googleusercontent.com
bit.ly stumbleupon.com del.icio.us delicious.com
validator.w3.org
[twitter]
username = user
password = pass
admins = @example
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment