Skip to content

Instantly share code, notes, and snippets.

@bnrubin
Created February 6, 2014 04:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bnrubin/8838441 to your computer and use it in GitHub Desktop.
Save bnrubin/8838441 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
activate_this = '/home/bnrubin/.virtualenvs/celery/bin/activate_this.py'
execfile(activate_this, dict(__file__=activate_this))
import irssi
#import logging
import re
import redis
from datetime import datetime
import math
#log = logging.getLogger()
#log.setLevel(logging.DEBUG)
#handler = graypy.GELFHandler('localhost', 12201, debugging_fields=False)
#handler.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
#log.addHandler(handler)
r = redis.StrictRedis(host='localhost', port=6379, db=0)
def stripColors(msg):
pattern = re.compile("\x1f|\x02|\x12|\x0f|\x16|\x03(?:\d{1,2}(?:,\d{1,2})?)?",
re.UNICODE)
return re.sub(pattern,'',msg)
def unix_time(dt):
epoch = datetime.utcfromtimestamp(0)
delta = dt - epoch
return math.floor(delta.total_seconds()/10.0)
def testlog(server, data, nick, mask, target):
record = {'datetime': datetime.now(),
'server': server.tag,
'target': target,
'nick': nick,
'mask': mask,
'data': stripColors(data)}
r.publish('irssi_log', record)
def announce(message):
w = irssi.window_find_name('logtest')
w.prnt(message)
def detect_basic_flood(server, data, nick, mask, target):
"""Basic flood detection"""
keyname = 'irssi_basic_flood %s:%s:%s@%s|%s' % (server.tag, target, nick, mask,
unix_time(datetime.now()))
pipe = r.pipeline()
pipe.incr(keyname,1)
pipe.expire(keyname,10)
result = pipe.execute()[0]
if result > 3:
announce('Flood (%s) detected by %s@%s in %s' % (result, nick, mask,
target))
if target == '#ubuntu-unregged':
server.send_raw('KICK #ubuntu-unregged %s' % nick)
def detect_repeat(server, data, nick, mask, target):
""" Detect messages repeated `repeat_len` times over `repeat_expiry`
seconds"""
repeat_len = 3
repeat_expiry = 10
keyname = 'irssi_repeat %s:%s:%s@%s|%s' % (server.tag, target, nick, mask, data)
pipe = r.pipeline()
pipe.incr(keyname, 1)
pipe.expire(keyname, repeat_expiry)
result = pipe.execute()[0]
if result >= repeat_len:
announce('Repeat (%s) detected by %s@%s in %s' % (result, nick, mask,
target))
def detect_lev(server, data, nick, mask, target):
keyname = 'irssi_lev %s:%s:%s@%s|%s' % (server.tag, target, nick, mask,
data)
irssi.signal_add('message public',detect_basic_flood)
irssi.signal_add('message public',detect_repeat)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment