Skip to content

Instantly share code, notes, and snippets.

@pfote
Created November 23, 2012 08:41
Show Gist options
  • Save pfote/4134562 to your computer and use it in GitHub Desktop.
Save pfote/4134562 to your computer and use it in GitHub Desktop.
my old bot.py
#!/usr/bin/env python
##################################################
# little programming exercise using twisted irc
# ibot is a infobot which whole purpose is to
# autorespond to a defined set of keywords with
# a stored phrase
# $Id: bot.py,v 1.4 2003/12/14 13:38:29 pfote Exp $
##################################################
__version__ = "$Revision: 1.4 $"[11:-2]
HAS_BOTADMIN = 'a'
import ConfigParser,sys,time,string
from twisted.protocols import irc
from twisted.internet import reactor, protocol
from twisted.persisted.dirdbm import Shelf
from botstrings import *
from pprint import pformat # for debugging
import re
import time
class IrcAuth:
"""base class for authenticating"""
def __init__(self,dir='auth'):
"""dummy init method"""
self.auth = Shelf(dir)
if len(self.auth)==0:
self.add('pfote','.*infosys.de')
def add(self, nick, hostmask=None):
"""add something to the credentials list"""
# print "addauth() called with %s %s" % (nick,hostmask)
self.auth[nick] = hostmask
print self.list()
def delete(self,nick):
del self.auth[nick]
def count(self):
return len(self.auth)
def list(self):
"""list all authenticated nicks"""
return str(self.auth.keys())
def __repr__(self):
list = []
for k in self.auth.keys():
list.append("%s (%s)" % (k,self.auth[k]))
return ",".join(list)
def verify(self, nick, hostmask):
if self.auth.has_key(nick):
# its defined
print "verify %s with %s " % (nick,hostmask)
if (self.auth[nick] is None) or (self.auth[nick] == ''):
return(1)
else:
return re.compile(self.auth[nick]).match(hostmask)
return(0)
class IrcMode:
def __init__(self,dir='modes'):
"""dummy init method"""
self.mode = Shelf(dir)
if len(self.mode) == 0:
self.add('pfote',HAS_BOTADMIN)
def add(self, nick, mode=None):
"""extend the modes of a user"""
if self.mode.has_key(nick):
print "%s exists in mode hash" % nick
if string.find(self.mode[nick],mode) == -1:
self.mode[nick] = self.mode[nick] + mode
print "adding mode %s to %s" % (mode,nick)
else:
self.mode[nick] = mode
print self.list()
def remove(self,nick,mode):
if string.find(self.mode[nick],mode) != -1:
# nick has mode .. remove it
newmode = string.replace(self.mode[nick],mode,'')
self.mode[nick] = newmode
if newmode == '':
del self.mode[nick]
def has_mode(self,nick,mode):
"see if a nick has a special mode"
return string.find(self.mode[nick],mode) != -1
def list(self):
"""list all authenticated nicks"""
for k in self.mode.keys():
print "%s => %s" % (k,self.mode[k])
class InfoStorage:
"""container class for the data the infobot should handle"""
def __init__(self,dir='dict'):
self.dict = Shelf(dir)
def add(self,key,data):
"""adds a entry to the dict"""
if not self.dict.has_key(key):
self.dict[key.strip()] = data.strip()
def haskey(self,key):
return self.dict.has_key(key.strip())
def get(self,key):
return self.dict[key.strip()]
def count(self):
return len(self.dict)
def delete(self,key):
if self.dict.has_key(key.strip()):
del self.dict[key.strip()]
def searchkeys(self, subkey):
"""substring search on all stored keys,
returns a array with matching keys"""
ret = []
keys = self.dict.keys()
print "searching for %s in %s" % (subkey," ".join(keys))
for k in keys:
if string.find(k,subkey) != -1:
ret.append(k)
return(ret)
def searchdata(self, subkey):
"""substring search on all stored key values,
returns a array with matching keys"""
ret = []
keys = self.dict.keys()
print "searching for %s in %s" % (subkey," ".join(keys))
for k in keys:
if string.find(self.dict[k],subkey) != -1:
ret.append(k)
return(ret)
class MessageLogger:
"""
An independant logger class (because separation of application
and protocol logic is a good thing).
"""
def __init__(self, file):
self.file = file
def log(self, message):
timestamp = time.strftime("[%H:%M:%S]", time.localtime(time.time()))
self.file.write('%s %s\n' % (timestamp, message))
self.file.flush()
def close(self):
self.file.close()
class BotProtocol(irc.IRCClient):
"""Just a IRC bot."""
def __init__(self):
self.logger = None
self.nickname = "ibot"
self.is_joined = 0;
self.seen = {}
def connectionMade(self):
if self.logger is None:
self.logger = MessageLogger(open(self.factory.config['logfile'], "a"))
self.log = self.logger.log
irc.IRCClient.connectionMade(self)
if self.nickname != self.factory.config['botname']:
self.nickname = self.factory.config['botname']
self.setNick(self.nickname)
self.log("[connected at %s]" % time.asctime(time.localtime(time.time())))
def nickservIdent(self, sender, passwd):
"""sends IDENT request to NickServ"""
self.sendLine(":%s PRIVMSG NickServ :IDENTIFY %s" % (sender,passwd))
self.log(":%s PRIVMSG NickServ :IDENTIFY %s" % (sender,passwd))
def signedOn(self):
"""gets called when we signed in .. it joins into the chan and retrieves
a list of users"""
self.join(self.factory.channel)
reactor.callLater(2,self.callUserlist);
def callUserlist(self):
self.sendLine("WHO #%s" % (self.factory.channel))
def connectionLost(self, reason):
if self.logger is None:
self.logger = MessageLogger(open(self.factory.config['logfile'], "a"))
self.log = self.logger.log
irc.IRCClient.connectionLost(self)
t = time.asctime(time.localtime(time.time()))
self.log("[disconnected at %s] %s" % (t,reason))
self.logger.close()
# callbacks for events
def kickedFrom(self, channel, kicker, message):
"""gets called when the bot itself was kicked from the chan"""
self.log("kickedFrom(%s,%s,%s) called" % (channel, kicker, message))
self.is_joined = 0;
# try to reconnect
self.rejoin(channel)
def userKicked(self, kickee, channel, kicker, message):
self.log("userKicked(%s,%s,%s,%s) called" % (kickee, channel, kicker, message))
def rejoin(self,channel):
self.log("attempt to join the chan %s" % (channel))
#attempt to join the chan
self.join(channel)
# if it doesnt work, go into a loop
if self.is_joined == 0:
self.log("attempt to join the chan %s failed, retry in 2s" % (channel))
reactor.callLater(2,self.rejoin,channel)
def joined(self, channel):
self.log("[I have joined %s]" % channel)
self.is_joined = 1;
def privmsg(self, user, channel, msg):
nick,hostmask = string.split(user, '!', 1)
self.log("<%s> %s" % (nick, msg))
privmsg = 0
if channel == self.nickname:
# self.say(self.factory.channel,"privmsg() called with %s(%s)/%s/%s" % (user,nick,channel,msg))
privmsg = 1
else:
# self.say(self.factory.channel,"chanmsg() called with %s(%s)/%s/%s" % (user,nick,channel,msg))
if not self.seen.has_key(nick): self.seen[nick] = {}
self.seen[nick]['time'] = time.time()
self.seen[nick]['said'] = msg
if msg[0] == self.factory.prefix:
# could be a command
items = msg.split(" ")
message = " ".join(items[1:])
cmnd = "cmd_" + items[0][1:]
c = getattr(self, cmnd, None)
if c is not None:
nick = string.split(user, '!', 1)[0]
c(msg = message,user = user, channel = channel, nick=nick, privmesg=privmsg)
# see if its a storage request
m = re.match(r"^([\w\s]+)=(.*)$", msg)
if m and self.factory.auth.verify(nick,user):
if not self.factory.info.haskey(m.group(1)):
self.factory.info.add(m.group(1),m.group(2))
self.notice(nick,"%s stored" % m.group(1))
else:
self.notice(nick,"%s exists, use !del to delete first" % m.group(1))
# see if its a single line with a keyword
m = re.match(r"(.*)\?$", msg)
if m and self.factory.info.haskey(m.group(1)):
# found one ... print it
self.say(self.factory.channel,"%s: %s" % (m.group(1),self.factory.info.get(m.group(1))))
# have we been adressed ?
# build a match string starting with the bot name
ms = r"^%s[\:\,\s](.*)\?$" % self.nickname
m = re.match(ms,msg)
if m and self.factory.info.haskey(m.group(1)):
# found one ... print it
self.say(self.factory.channel,"%s: %s" % (m.group(1).strip(),self.factory.info.get(m.group(1))))
def action(self, user, channel, msg):
nick = string.split(user, '!', 1)[0]
self.log("* %s %s" % (nick, msg))
# self.say(channel,"action() called with %s(%s)/%s/%s" % (user,nick,channel,msg))
# irc callbacks
def noticed(self, user, channel, msg):
nick = string.split(user, '!', 1)[0]
self.log("<%s> %s" % (nick, msg))
if nick == 'NickServ' and self.factory.identifyPending == 1:
# could be a ident request
if self.factory.nickservpw:
self.nickservIdent(self.nickname,self.factory.nickservpw)
self.factory.identifyPending = 0
self.log("[IDENTIFY to NICKSERV %s]" % time.asctime(time.localtime(time.time())))
#self.say(channel,"noticed() called with %s(%s)/%s/%s" % (user,nick,channel,msg))
def irc_NICK(self, prefix, params):
"""When an IRC user changes their nickname
"""
old_nick = string.split(prefix,'!')[0]
new_nick = params[0]
channel = self.factory.channel
if channel[0] != "#": channel = "#" + channel
# self.say(channel,"irc_NICK() called with %s/%s" % (prefix,"|".join(params)))
self.log("%s is now known as %s" % (old_nick, new_nick))
def listCommands(self):
channel = self.factory.channel
for _cmd in dir(self):
if _cmd[:4] == 'cmd_':
c = getattr(self,_cmd,None);
if c is not None:
self.say(channel,"%s: %s" % (_cmd[4:],c.__doc__));
def display_seen(self,nick):
channel = self.factory.channel
if self.seen.has_key(nick):
if self.seen[nick]['said']:
self.say(channel,BOT_LASTSEENSAYING %
(nick,time.strftime(STRFTIME_FORMAT, time.localtime(self.seen[nick]['time'])),self.seen[nick]['said'])
);
else:
self.say(channel,BOT_LASTSEEN %
(nick,time.strftime(STRFTIME_FORMAT, time.localtime(self.seen[nick]['time']))));
else:
self.say(channel,"dont know anything about %s" % nick)
def userJoined(self,nick, channel):
if channel == self.factory.channel:
if not self.seen.has_key(nick): self.seen[nick] = {}
self.seen[nick]['time'] = time.time()
self.seen[nick]['said'] = ''
def cmd_addmode(self,**params):
"""addmode <nick> <mode> adds a mode to a nick .... needs 'a' mode"""
if self.factory.mode.has_mode(params['nick'],'a'):
try:
target = params['msg'].split()[0]
mode = params['msg'].split()[1]
except:
return 0
self.factory.mode.add(target,mode)
self.say(self.factory.channel,"%s has added mode %s to %s" % (params['nick'],mode,target))
def cmd_delmode(self,**params):
"""delmode <nick> <mode> removes a mode from a nick"""
if self.factory.mode.has_mode(params['nick'],'a'):
try:
target = params['msg'].split()[0]
mode = params['msg'].split()[1]
except:
return 0
self.factory.mode.remove(target,mode)
self.say(self.factory.channel,"%s has removed mode %s from %s" % (params['nick'],mode,target))
def cmd_seen(self,**params):
"!seen <nickname> lists last occurrance of that nick"
self.display_seen(params['msg'])
def cmd_del(self,**params):
"""!del <keyword> deletes a key from the info database"""
nick = string.split(params['user'], '!', 1)[0]
key = params['msg']
if self.factory.auth.verify(nick,params['user']):
self.notice(nick,"deleting %s" % key)
self.factory.info.delete(key)
#def cmd_seenall(self,**params):
# "lists all users i have a idea of ..."
# channel = self.factory.channel
# for s in self.seen.keys(): self.display_seen(s)
def cmd_addauth(self,**params):
"""<nick> <hostmask> (adding a trusted user)"""
nick = string.split(params['user'], '!', 1)[0]
user = string.split(params['msg'], ' ', 1)[0]
hostmask = string.split(params['msg'], ' ', 1)[1]
if self.factory.auth.verify(nick,params['user']):
self.factory.auth.add(user,hostmask)
def cmd_listauth(self,**params):
"""list all known user nicks """
nick = string.split(params['user'], '!', 1)[0]
if params['privmesg'] == 1:
channel = params['user']
else:
channel = self.factory.channel
if self.factory.auth.verify(nick,params['user']):
self.say(channel,"trusted users are: %s" % self.factory.auth)
else:
self.say(channel,"you are not authorised to ask this")
def cmd_delauth(self,**params):
"""deletes one from the auth list .. beware, nicknames are case sensitive"""
nick = string.split(params['user'], '!', 1)[0]
user = string.split(params['msg'], ' ', 1)[0]
if self.factory.auth.verify(nick,params['user']):
self.factory.auth.delete(user)
def cmd_helpall(self,**params):
"""lists all available commands with descriptions"""
channel = self.factory.channel
for _cmd in dir(self):
if _cmd[:4] == 'cmd_':
c = getattr(self, 'cmd_' + _cmd[4:], None)
if c is not None:
if docstrings.has_key(_cmd):
self.say(channel,"%s: %s" % (_cmd[4:],docstrings[_cmd]))
else:
self.say(channel,"%s: %s" % (_cmd[4:],c.__doc__));
def cmd_help(self,**params):
"!help gives list of commands, !help <cmd> gives details"
channel = self.factory.channel
if (len(params) ==0) or len(params['msg']) == 0:
cmds = "";
for _cmd in dir(self):
if _cmd[:4] == 'cmd_':
cmds = cmds + " !" + _cmd[4:]
self.say(channel,"commands implemented: %s" % cmds);
self.say(channel,BOT_GENERICHELP);
elif params['msg']:
c = getattr(self, 'cmd_' + params['msg'], None)
if c is not None:
if docstrings.has_key('cmd_' + params['msg']):
self.say(channel,"%s: %s" % (params['msg'],docstrings['cmd_' + params['msg']]))
else:
self.say(channel,"%s: %s" % (params['msg'],c.__doc__));
def cmd_version(self,**params):
"""prints version,copyright info and uptime"""
channel = self.factory.channel
t = time.strftime(STRFTIME_FORMAT, time.localtime(self.factory.started))
self.say(channel,BOT_VERSION % (__version__,t));
def cmd_search(self,**params):
"""does a substring search of all stored keywords"""
keys = self.factory.info.searchkeys(params['msg'])
print "searchkeys returned %s" % keys
if len(keys)==0:
self.say(self.factory.channel,BOT_KEYNOTFOUND)
else:
self.say(self.factory.channel,BOT_KEYSFOUND % ",".join(keys))
def cmd_searchdata(self,**params):
"""does a substring search of all stored descriptions"""
keys = self.factory.info.searchdata(params['msg'])
print "searchkeys returned %s" % keys
if len(keys)==0:
self.say(self.factory.channel,BOT_KEYNOTFOUND)
else:
self.say(self.factory.channel,BOT_KEYSCONTAIN % (params['msg'],",".join(keys)))
def cmd_die(self,**params):
"let the robot die (only auth users)"
nick = string.split(params['user'], '!', 1)[0]
if self.factory.auth.verify(nick,params['user']):
self.quit("bye bye")
reactor.stop()
def cmd_status(self,**params):
"""status report"""
t = time.strftime(STRFTIME_FORMAT, time.localtime(self.factory.started))
self.say(self.factory.channel,BOT_STATUSMSG % (t,self.factory.info.count(),self.factory.auth.count()))
def irc_RPL_WHOREPLY(self, prefix, params):
"""cb to a WHO REPLY """
channel = self.factory.channel
requester,chan,ident,userhost,node,nick,flags,dontknow = params
if chan[1:] == channel:
if not self.seen.has_key(nick): self.seen[nick] = {}
self.seen[nick]['time'] = time.time()
self.seen[nick]['said'] = ''
# self.say(channel,"got a WHOREPLY(prefix %s, params %s)" %(prefix,"|".join(params)));
class ChatBot(protocol.ClientFactory):
"""main bot object"""
protocol = BotProtocol
# the class of the protocol to build
def __init__(self,config):
self.config = config
self.channel = self.config['channel']
self.botname = self.config['botname']
if self.config.has_key('auth'):
self.auth = IrcAuth(self.config['auth'])
else:
self.auth = IrcAuth()
self.info = InfoStorage()
self.started = time.time()
if self.config.has_key('nickservpw'):
self.nickservpw = self.config['nickservpw']
self.identifyPending = 1
if self.config.has_key('prefix'):
self.prefix = self.config['prefix']
else:
self.prefix = '!'
self.mode = IrcMode()
def clientConnectionLost(self, connector, reason):
"""If we get disconnected, reconnect to server."""
connector.connect()
def clientConnectionFailed(self, connector, reason):
print "connection failed:", reason
reactor.stop()
def printUsage():
print "usage: %s <configfile>" % sys.argv[0]
sys.exit(1)
def main():
if len(sys.argv) != 2:
printUsage()
configFile = sys.argv[1]
config = ConfigParser.ConfigParser()
try:
config.readfp(open(configFile))
if config.has_section('IRC'):
mainConfig = dict(config.items('IRC'))
else:
print "incomplete config file %s" % configFile
printUsage()
except:
print "error parsing config file %s" % configFile
printUsage()
f = ChatBot(mainConfig)
reactor.connectTCP(mainConfig['server'], int(mainConfig['serverport']), f)
reactor.run()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment