Skip to content

Instantly share code, notes, and snippets.

@jabb
Last active May 12, 2022 22:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jabb/5790939 to your computer and use it in GitHub Desktop.
Save jabb/5790939 to your computer and use it in GitHub Desktop.
GrueBot: A Very Simply IRC Bot Written in Python
from gruebot import Plugin
import hashlib
class auth(Plugin):
def is_authed(self):
return self.get_user() in self.authed
def init(self, *args, **kwargs):
self.password = hashlib.md5(kwargs['password'].encode()).hexdigest()
self.authed = {}
self.commands = {
'auth': self.cmd_auth,
}
def cmd_auth(self, args):
if self.password == hashlib.md5(args[0].encode()).hexdigest():
self.authed[self.get_user()] = True
self.reply('you are now authorized')
else:
self.reply('incorrect password')
from gruebot import Plugin
import auth
class commands(Plugin):
def init(self, *args, **kwargs):
self.auth = self.get_plugin(auth.auth)
if not self.auth:
print('Authorization plugin not found.')
self.commands = {
'_join': self.cmd_join,
'_leave': self.cmd_leave,
'_load': self.cmd_load,
'_unload': self.cmd_unload,
'_disconnect': self.cmd_disconnect,
}
def cmd_join(self, args):
if self.auth and self.auth.is_authed():
self.join_channel(args[0])
else:
self.reply('you are not authorized to do that')
def cmd_leave(self, args):
if self.auth and self.auth.is_authed():
self.leave_channel(args[0])
else:
self.reply('you are not authorized to do that')
def cmd_load(self, args):
if self.auth and self.auth.is_authed():
if self.load_module(args[0]):
self.reply('loaded %s' % args[0])
else:
self.reply('failed to load %s' % args[0])
else:
self.reply('you are not authorized to do that')
def cmd_unload(self, args):
if self.auth and self.auth.is_authed():
self.unload_module(args[0])
self.reply('unloaded %s' % args[0])
else:
self.reply('you are not authorized to do that')
def cmd_disconnect(self, args):
if self.auth and self.auth.is_authed():
self.disconnect()
else:
self.reply('you are not authorized to do that')
# Copyright (c) 2013, Michael Patraw
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import socket, re, traceback, time, shlex, inspect
class Plugin():
""" Wraps a plugin. """
def __init__(self, bot, *args, **kwargs):
self.__bot = bot
self.init(*args, **kwargs)
def init(self, *args, **kwargs): pass
def disconnect(self):
self.get_connection().disconnect()
def load_module(self, name):
return self.__bot.load_module(name)
def unload_module(self, name):
self.__bot.unload_module(name)
def get_plugin(self, plugin):
return self.__bot.get_plugin(plugin)
def unload(self):
self.__bot.unload_plugin(self.__class__)
def get_connection(self):
return self.__bot.get_current_connection()
def get_source(self):
return self.get_connection().get_current_source()
def get_user(self):
return self.get_connection().get_current_user()
def get_channel(self):
return self.get_connection().get_current_channel()
def pong(self, arg):
self.get_connection().send_command('PONG', arg)
def reply(self, msg):
self.reply_to(self.get_source(), msg)
def reply_to(self, targ, msg):
self.get_connection().send_command('PRIVMSG', targ, msg)
def notice(self, msg):
self.notice_to(self.get_source(), msg)
def notice_to(self, targ, msg):
self.get_connection().send_command('NOTICE', targ, msg)
def join_channel(self, chan, key = None):
if key: self.get_connection().send_command('JOIN', chan + '; ', key)
else: self.get_connection().send_command('JOIN', chan)
def leave_channel(self, chan):
self.get_connection().send_command('PART', chan)
def change_topic(self, chan, msg):
self.get_connection().send_command('TOPIC', chan, msg)
def password(self, pw):
self.get_connection().send_command('PASS', pw)
def oper(self, name, pw):
self.get_connection().send_command('OPER', '%s %s' % (name, pw))
def quit(self):
self.get_connection().send_command('QUIT')
def set_mode(self, targ, m, user = None):
'''
Channel modes: [+|-]|o|p|s|i|t|n|b|v
User modes: [+|-]|i|w|s|o
'''
targ += ' %s' % m
if arg:
targ += ' %s' % arg
self.get_connection().send_command('MODE', targ)
def list_names(self, chan = None):
self.get_connection().send_command('NAMES', chan)
def list_channels(self, chan = None):
self.get_connection().send_command('LIST', chan)
def invite(self, chan, user):
self.get_connection().send_command('INVITE', '%s %s' % (user, chan))
def kick(self, chan, user, reason = None):
self.get_connection().send_command('KICK', '%s %s' % (chan, user), reason)
def version(self, on):
self.get_connection().send_command('VERSION', on)
def stats(self, arg = None):
''' Possible args: chiklmoyu '''
self.get_connection().send_command('STATS', arg)
def server_links(self, on):
self.get_connection().send_command('LINKS', on)
def time(self, on):
self.get_connection().send_command('TIME', on)
def traceroute(self, on):
self.get_connection().send_command('TRACE', on)
def find_admin(self, on):
self.get_connection().send_command('ADMIN', on)
def server_info(self, on):
self.get_connection().send_command('INFO', on)
def who(self, match, mode = None):
if mode:
match += ' %s' % mode
self.get_connection().send_command('WHO', match)
def whois(self, user, server = None):
if server:
user = '%s ' % server + user
self.get_connection().send_command('WHOIS', user)
def whowas(self, user, count = None, server = None):
if count:
user += ' %s' % count
if server:
user += ' %s' % server
self.get_connection().send_command('WHOWAS', user)
def on_tick(self): pass
def on_ping(self, arg): pass
def on_message(self, msg): pass
def on_join(self): pass
def on_notice(self, msg): pass
def on_channel_notice(self, msg): pass
def on_mode_change(self, m): pass
def on_invite(self): pass
def on_kick(self): pass
def rpl(self, kind, msg): pass
def err(self, kind, msg): pass
def on_raw_command(self, prefix, command, params): pass
class CorePlugin(Plugin):
def on_ping(self, arg):
print('PONGING %s' % arg)
self.pong(arg)
class DebugPlugin(Plugin):
'''
You initialize with a string specifying what you want to have
printed:
* - print everything
c - print commands
'''
def init(self, what):
self.debug_what = what
def on_raw_command(self, prefix, command, params):
if '*' in self.debug_what or 'c' in self.debug_what:
print('%s: %s %s' % (prefix or '', command, params))
def err(self, kind, msg):
if '*' in self.debug_what or 'e' in self.debug_what:
print('%s: %s' % (kind, msg))
def rpl(self, kind, msg):
if '*' in self.debug_what or 'r' in self.debug_what:
print('%s: %s' % (kind, msg))
class JoinPlugin(Plugin):
def init(self, *args, **kwargs):
self.channels = args
def on_tick(self):
if self.get_connection().is_ready():
for chan in self.channels:
self.join_channel(chan)
self.unload()
class PluginManager():
def __init__(self):
self.__plugins = {}
self.__commands = {}
def dispatch(self, event, *args, **kwargs):
try:
for name, plugin in list(self.__plugins.items()):
getattr(plugin, event)(*args, **kwargs)
except:
traceback.print_exc()
def register_command(self, cmd, f):
if cmd in self.__commands:
print('conflicting commands: %s' % cmd)
else:
self.__commands[cmd] = f
def unregister_command(self, cmd):
del self.__commands[cmd]
def on_command(self, args):
if len(args) > 0:
cmd = args[0]
args = args[1:]
if cmd in self.__commands:
self.__commands[cmd](args)
def register_plugin(self, inst):
self.__plugins[inst.__class__.__name__] = inst
for cmd, f in getattr(inst, 'commands', {}).items():
self.register_command(cmd, f)
def unregister_plugin(self, clsinst):
index = None
if hasattr(clsinst, '__name__'):
index = clsinst.__name__
else:
index = clsinst.__class__.__name__
for cmd, f in getattr(self.__plugins[index], 'commands', {}).items():
self.unregister_command(cmd, f)
del self.__plugins[index]
def get_plugin(self, cls):
if cls.__name__ in self.__plugins:
return self.__plugins[cls.__name__]
RFC_RPL = {
'1': 'rpl_welcome',
'2': 'rpl_yourhost',
'3': 'rpl_created',
'4': 'rpl_myinfo',
'5': 'rpl_bounce',
'302': 'rpl_userhost',
'303': 'rpl_ison',
'301': 'rpl_away',
'305': 'rpl_unaway',
'306': 'rpl_nowaway',
'311': 'rpl_whoisuser',
'312': 'rpl_whoisserver',
'313': 'rpl_whoisoperator',
'317': 'rpl_whoisidle',
'318': 'rpl_endofwhois',
'319': 'rpl_whoischannels',
'314': 'rpl_whowasuser',
'369': 'rpl_endofwhowas',
'322': 'rpl_list',
'323': 'rpl_listend',
'325': 'rpl_uniqopis',
'324': 'rpl_channelmodeis',
'331': 'rpl_notopic',
'332': 'rpl_topic',
'341': 'rpl_inviting',
'342': 'rpl_summoning',
'346': 'rpl_invitelist',
'347': 'rpl_endofinvitelist',
'348': 'rpl_exceptlist',
'349': 'rpl_endofexceptlist',
'351': 'rpl_version',
'352': 'rpl_whoreply',
'315': 'rpl_endofwho',
'353': 'rpl_namreply',
'366': 'rpl_endofnames',
'364': 'rpl_links',
'365': 'rpl_endoflinks',
'367': 'rpl_banlist',
'368': 'rpl_endofbanlist',
'371': 'rpl_info',
'374': 'rpl_endofinfo',
'375': 'rpl_motdstart',
'372': 'rpl_motd',
'376': 'rpl_endofmotd',
'381': 'rpl_youreoper',
'382': 'rpl_rehashing',
'383': 'rpl_youreservice',
'391': 'rpl_time',
'392': 'rpl_usersstart',
'393': 'rpl_users',
'394': 'rpl_endofusers',
'395': 'rpl_nousers',
'200': 'rpl_tracelink',
'201': 'rpl_traceconnecting',
'202': 'rpl_tracehandshake',
'203': 'rpl_traceunknown',
'204': 'rpl_traceoperator',
'205': 'rpl_traceuser',
'206': 'rpl_traceserver',
'207': 'rpl_traceservice',
'208': 'rpl_tracenewtype',
'209': 'rpl_traceclass',
'261': 'rpl_tracelog',
'262': 'rpl_traceend',
'211': 'rpl_statslinkinfo',
'212': 'rpl_statscommands',
'219': 'rpl_endofstats',
'242': 'rpl_statsuptime',
'243': 'rpl_statsoline',
'221': 'rpl_umodeis',
'234': 'rpl_servlist',
'235': 'rpl_servlistend',
'251': 'rpl_luserclient',
'252': 'rpl_luserop',
'253': 'rpl_luserunknown',
'254': 'rpl_luserchannels',
'255': 'rpl_luserme',
'256': 'rpl_adminme',
'257': 'rpl_adminloc1',
'258': 'rpl_adminloc2',
'259': 'rpl_adminemail',
'263': 'rpl_tryagain',
}
RFC_ERR = {
'401': 'err_nosuchnick',
'402': 'err_nosuchserver',
'403': 'err_nosuchchannel',
'404': 'err_cannotsendtochan',
'405': 'err_toomanychannels',
'406': 'err_wasnosuchnick',
'407': 'err_toomanytargets',
'408': 'err_nosuchservice',
'409': 'err_noorigin',
'411': 'err_norecipient',
'412': 'err_notexttosend',
'413': 'err_notoplevel',
'414': 'err_wildtoplevel',
'415': 'err_badmask',
'421': 'err_unknowncommand',
'422': 'err_nomotd',
'423': 'err_noadmininfo',
'424': 'err_fileerror',
'431': 'err_nonicknamegiven',
'432': 'err_erroneusnickname',
'433': 'err_nicknameinuse',
'436': 'err_nickcollision',
'437': 'err_unavailresource',
'441': 'err_usernotinchannel',
'442': 'err_notonchannel',
'443': 'err_useronchannel',
'444': 'err_nologin',
'445': 'err_summondisabled',
'446': 'err_usersdisabled',
'451': 'err_notregistered',
'461': 'err_needmoreparams',
'462': 'err_alreadyregistred',
'463': 'err_nopermforhost',
'464': 'err_passwdmismatch',
'465': 'err_yourebannedcreep',
'466': 'err_youwillbebanned',
'467': 'err_keyset',
'471': 'err_channelisfull',
'472': 'err_unknownmode',
'473': 'err_inviteonlychan',
'474': 'err_bannedfromchan',
'475': 'err_badchannelkey',
'476': 'err_badchanmask',
'477': 'err_nochanmodes',
'478': 'err_banlistfull',
'481': 'err_noprivileges',
'482': 'err_chanoprivsneeded',
'483': 'err_cantkillserver',
'484': 'err_restricted',
'485': 'err_uniqopprivsneeded',
'491': 'err_nooperhost',
'501': 'err_umodeunknownflag',
'502': 'err_usersdontmatch',
}
class Connection():
""" Wraps up a single connection to a server. """
expr_space = "\s+"
expr_prefix = "(?P<prefix>[^ ]+)"
expr_command = "(?P<command>([A-Z]+|[0-9]+))"
expr_middle = "(?P<middle>(?!( :)|:).+?(?=(?: :)|(?:\s*$)))"
expr_trailing = "(?P<trailing>.*)"
expr_params = "((" + expr_space + ":" + expr_trailing + ")|(" + expr_space + expr_middle + "))+"
expr_message = "^(:" + expr_prefix + expr_space + ")?" + expr_command + expr_params + "$"
irc_parse_prog = re.compile(expr_message)
def __init__(self, plugin_manager, network, nick, port = 6667):
try:
self.plugin_manager = plugin_manager
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((network, port))
self.send_command('NICK', nick)
self.send_command('USER', '%s %s %s' % (nick, nick, nick), ':My bot')
self.network = network
self.nick = nick
self.__connected = True
self.__waiting_for_motd = True
except socket.error as e:
print(e)
self.__connected = False
def disconnect(self):
self.sock.close()
self.__connected = False
def is_connected(self):
return self.__connected
def is_ready(self):
return not self.__waiting_for_motd
def get_current_source(self):
return self.__current_source
def set_current_source(self, src):
self.__current_source = src
def get_current_user(self):
return self.__current_user
def set_current_user(self, user):
self.__current_user = user
def get_current_channel(self):
return self.__current_channel
def set_current_channel(self, channel):
self.__current_channel = channel
def process(self):
msgs = self.recv_commands()
for msg in msgs:
prefix = msg['prefix']
if prefix and '!' in prefix:
prefix = prefix.partition('!')[0]
command = msg['command']
params = msg['params']
self.set_current_source(None)
self.set_current_user(None)
self.set_current_channel(None)
if command in RFC_RPL:
self.set_current_source(prefix)
if RFC_RPL[command] == 'rpl_endofmotd':
self.__waiting_for_motd = False
param = params[1] if len(params) > 1 else None
self.plugin_manager.dispatch('rpl', RFC_RPL[command], param)
elif command in RFC_ERR:
self.set_current_source(prefix)
param = params[1] if len(params) > 1 else None
self.plugin_manager.dispatch('err', RFC_ERR[command], param)
else:
self.plugin_manager.dispatch('on_raw_command', prefix, command, params)
if command == 'PING':
self.plugin_manager.dispatch('on_ping', params[0])
elif command == 'PRIVMSG':
if params[0] == self.nick:
self.set_current_source(prefix)
self.set_current_user(prefix)
self.set_current_channel(None)
else:
self.set_current_source(params[0])
self.set_current_user(prefix)
self.set_current_channel(params[0])
self.plugin_manager.dispatch('on_message', params[1])
self.plugin_manager.on_command(shlex.split(params[1]))
elif command == 'NOTICE':
if params[0] == self.nick:
self.set_current_source(prefix)
self.set_current_user(prefix)
self.set_current_channel(None)
else:
self.set_current_source(params[0])
self.set_current_user(prefix)
self.set_current_channel(params[0])
self.plugin_manager.dispatch('on_notice', params[1])
elif command == 'MODE':
self.set_current_source(prefix)
if len(params) >= 3:
self.set_current_user(params[2])
self.set_current_channel(params[0])
self.plugin_manager.dispatch('on_mode_change', params[1])
elif command == 'JOIN':
self.set_current_source(prefix)
self.set_current_user(prefix)
self.set_current_channel(params[0])
self.plugin_manager.dispatch('on_join')
elif command == 'KICK':
self.set_current_source(prefix)
self.set_current_user(params[1])
self.set_current_channel(params[0])
self.plugin_manager.dispatch('on_kick')
elif command == 'INVITE':
self.set_current_source(prefix)
self.set_current_user(params[0])
self.set_current_channel(params[1])
self.plugin_manager.dispatch('on_invite')
self.set_current_source(None)
self.set_current_user(None)
self.set_current_channel(None)
self.plugin_manager.dispatch('on_tick')
def send_command(self, cmd, middle = None, args = None):
try:
self.sock.send(Connection.compose_irc_message(cmd, middle, args))
except Exception as e:
traceback.print_exc()
self.__connected = False
def recv_commands(self):
try:
return Connection.parse_irc_messages(self.sock.recv(4096))
except Exception as e:
traceback.print_exc()
self.__connected = False
@classmethod
def parse_irc_messages(cls, msgs):
msgs = msgs.decode().split('\r\n')
for msg in msgs:
m = cls.irc_parse_prog.search(msg)
if m:
d = {}
d['prefix'] = m.group('prefix')
d['command'] = m.group('command')
if m.group('middle'):
d['params'] = m.group('middle').split(' ')
else:
d['params'] = []
if m.group('trailing'):
d['params'].append(m.group('trailing'))
yield d
else:
pass
@classmethod
def compose_irc_message(cls, command, middle = None, trailing = None):
""" Does not include source, since clients shouldn't. """
s = '%s' % command
if middle:
s += ' %s' % middle
if trailing:
s += ' :%s' % trailing
s += '\r\n'
return s.encode()
class GrueBot():
""" A super simple IRC bot."""
def __init__(self):
self.connections = []
self.modules = {}
self.plugin_manager = PluginManager()
def connect(self, network, nick, port = 6667):
conn = Connection(self.plugin_manager, network, nick, port)
self.connections.append(conn)
def load_plugin(self, plugin, *args, **kwargs):
self.plugin_manager.register_plugin(plugin(self, *args, **kwargs))
def load_module(self, name, *args, **kwargs):
import importlib
import imp
try:
if not name in self.modules:
module = importlib.import_module(name)
else:
module = imp.reload(self.modules[name])
self.modules[name] = module
self.plugin_manager.register_plugin(module.__dict__[name](self, *args, **kwargs))
return True
except ImportError as e:
traceback.print_exc()
return False
except Exception as e:
traceback.print_exc()
return False
def unload_plugin(self, plugin):
self.plugin_manager.unregister_plugin(plugin)
def unload_module(self, name):
import importlib
import imp
try:
if name in self.modules:
self.plugin_manager.unregister_plugin(self.modules[name].__dict__[name])
del self.modules[name]
except ImportError as e:
traceback.print_exc()
except Exception as e:
traceback.print_exc()
def get_plugin(self, plugin):
return self.plugin_manager.get_plugin(plugin)
def get_current_connection(self):
return self.__current_connection
def run(self):
while True:
for conn in self.connections:
self.__current_connection = conn
conn.process()
time.sleep(1)
self.connections[:] = [c for c in self.connections if c.is_connected()]
if __name__ == '__main__':
bot = GrueBot()
bot.connect('irc.quakenet.org', 'gruebot')
bot.load_plugin(CorePlugin)
bot.load_plugin(DebugPlugin, '*')
bot.load_plugin(JoinPlugin, '#jabsplace')
bot.load_module('auth', password = 'mypass')
bot.load_module('commands')
bot.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment