Created
October 18, 2012 22:33
-
-
Save DivinityArcane/3915172 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#################################################### | |
# Ouroboros, the simple IRC bot! # | |
# # | |
# Author: Justin Eittreim # | |
# Contact: eittreim.justin@live.com # | |
# Web: http://DivinityArcane.deviantart.com/ # | |
# # | |
# Python version: 3.2 (May work in lower versions.)# | |
#################################################### | |
### TODO: Handle other misc. packets, like NAMES. | |
from socket import socket, AF_INET, SOCK_STREAM | |
import sys | |
from time import sleep, time | |
from datetime import datetime | |
class Bot: | |
def run(self): | |
self.running = True | |
self.autojoined = False | |
self.authenticated = False | |
self.debug = False | |
self.pingSent = 0 | |
self.started = time() | |
self.buffer = '' | |
# CONFIG STARTS HERE | |
self.serverAddress = 'irc.shadowkitsune.net' | |
self.serverPort = 6667 | |
self.config = { | |
'nick':'Ouroboros', | |
'user':'Ouroboros', | |
'realname':'Ouroboros IRC Bot', | |
'pass':None} | |
self.owner = { | |
'nick':'DivinityArcane', | |
'host':'NeverWinterNights.jp'} | |
self.trigger = '`' | |
self.autojoin = ['#Botdom', '#Bots'] | |
self.autoReJoin = True | |
# CONFIG ENDS HERE | |
self.output('Firing up! Welcome to Ouroboros, mate!') | |
self.output('Let\'s go ahead and try to connect.') | |
self.output('We\'re trying to connect to {0}:{1}'.format(self.serverAddress, self.serverPort)) | |
self.output('Using nickname: {0}'.format(self.config['nick'])) | |
self.output('Our owner is {0}, apparently! And our trigger is: {1}'.format(self.owner['nick'], self.trigger)) | |
self.socket = socket(AF_INET, SOCK_STREAM) | |
self.socket.connect((self.serverAddress, self.serverPort)) | |
#wait for msgs | |
self.socket.recv(4096) | |
self.send('NICK {0}\n'.format(self.config['nick'])) | |
self.send('USER {0} {0} {0} :{1}\n'.format(self.config['user'], self.config['realname'])) | |
while self.running: | |
# Because splitting merged packets is a pain in the rear! | |
byte = self.socket.recv(1) | |
self.buffer = self.buffer + byte.decode('utf-8') | |
# Unless we've got a whole packet, keep going. | |
if byte != b'\n': | |
continue | |
# Dafuq? We got disconnected! | |
if len(self.buffer) < 1: | |
self.output('Got disconnected!') | |
self.running = False | |
# Handle ping/pong | |
if self.buffer.startswith('PING'): | |
load = '' | |
if ' ' in self.buffer: | |
load = self.buffer[4:] | |
self.send('PONG{0}'.format(load)) | |
# Handle IRC errors | |
elif self.buffer.startswith('ERROR'): | |
error = self.buffer[7:] | |
self.output('IRC Error: {0}'.format(error)) | |
# Autojoin, but only if we don't need to auth first. | |
elif 'Welcome' in self.buffer and not self.autojoined and self.config['pass'] == None: | |
self.autojoined = True | |
# Autojoin! | |
for chan in self.autojoin: | |
self.join(chan) | |
self.say(chan, 'Hi there!') | |
# Handle received protocols | |
elif len(self.buffer) > 0 and self.buffer[0] == ':' and ' ' in self.buffer: | |
bits = self.buffer.rstrip().split(' ') | |
# Response to our ping | |
if bits[1] == 'PONG' and self.pingSent > 0: | |
splitter = self.buffer[1:].find(':') | |
load = float(self.buffer[splitter + 2:].strip()) | |
self.say(chan, 'Pong! Took {0} seconds.'.format(round(time() - self.pingSent, 3))) | |
self.pingSent = 0 | |
# Someone joined a room | |
elif bits[1] == 'JOIN': | |
(nick, user, host) = self.userData(bits[0]) | |
splitter = self.buffer[1:].find(':') | |
chan = self.buffer[splitter + 2:].strip() | |
if nick == self.config['nick']: | |
self.output('*** Joined {0}'.format(chan)) | |
else: | |
self.output('** {0} has joined {1}'.format(nick, chan)) | |
# Someone left a room | |
elif bits[1] == 'PART': | |
(nick, user, host) = self.userData(bits[0]) | |
splitter = self.buffer[1:].find(':') | |
chan = bits[2] | |
reason = '(No reason)' | |
if splitter != -1: | |
reason = self.buffer[splitter + 2:].strip() | |
if nick == self.config['nick']: | |
self.output('*** Left {0} ({1})'.format(chan, reason)) | |
else: | |
self.output('** {0} has left {1} ({2})'.format(nick, chan, reason)) | |
# Someone quit | |
elif bits[1] == 'QUIT': | |
(nick, user, host) = self.userData(bits[0]) | |
splitter = self.buffer[1:].find(':') | |
reason = self.buffer[splitter + 2:].strip() | |
self.output('** {0} has quit ({1})'.format(nick, reason)) | |
# Mode was set | |
elif bits[1] == 'MODE': | |
# User modes | |
if '!' not in bits[0]: | |
nick = bits[0][1:] | |
target = bits[2] | |
modes = bits[3][1:] | |
self.output('** {0} sets modes {1} on {2}'.format(nick, modes, target)) | |
# Channel modes | |
elif bits[2].startswith('#'): | |
(nick, user, host) = self.userData(bits[0]) | |
chan = bits[2] | |
modes = bits[3] | |
target = bits[4] | |
self.output('[{3}] ** {0} sets modes {1} on {2}'.format(nick, modes, target, chan)) | |
# Someone got kicked | |
elif bits[1] == 'KICK': | |
(nick, user, host) = self.userData(bits[0]) | |
chan = bits[2] | |
target = bits[3] | |
reason = ' '.join(bits) | |
delimeter = reason[1:].find(':') | |
reason = reason[delimeter + 2:] | |
if reason == nick: | |
reason = '(No reason)' | |
if target == self.config['nick'] and self.autoReJoin: | |
self.output('*** Kicked from {1} by {2}: {3}'.format(target, chan, nick, reason)) | |
self.join(chan) | |
else: | |
self.output('*** {0} was kicked from {1} by {2}: {3}'.format(target, chan, nick, reason)) | |
# Someone said something | |
elif bits[1] == 'PRIVMSG' or bits[1] == 'NOTICE': | |
(nick, user, host) = self.userData(bits[0]) | |
splitter = self.buffer[1:].find(':') | |
chan = bits[2].strip() | |
msg = self.buffer[splitter + 2:].strip() | |
# Shorthand for later | |
isOwner = nick == self.owner['nick'] and host == self.owner['host'] | |
# Output | |
# Ignore server messages | |
if not '.' in nick: | |
if msg.startswith(chr(1) + 'ACTION') and msg.endswith(chr(1)): | |
self.output('[{0}] * {1} {2}'.format(chan, nick, msg[8:-1])) | |
elif msg.startswith(chr(1)) and msg.endswith(chr(1)): | |
ctcp = msg[1:-1] | |
if ' ' in ctcp: | |
spacePosition = ctcp.find(' ') | |
ctcp = ctcp[:spacePosition] | |
self.output('*** CTCP {0} request from {1}'.format(ctcp, nick)) | |
elif bits[1] == 'NOTICE': | |
self.output('[{0}] -{1}- {2}'.format(chan, nick, msg)) | |
else: | |
self.output('[{0}] <{1}> {2}'.format(chan, nick, msg)) | |
# Authenticate. | |
if nick == 'NickServ' and self.config['pass'] != None and not self.authenticated: | |
self.authenticated = True | |
self.say('NickServ', 'IDENTIFY {0}'.format(self.config['pass'])) | |
# Autojoin after auth. | |
elif nick == 'NickServ' and self.authenticated and not self.autojoined: | |
self.autojoined = True | |
for chan in self.autojoin: | |
self.join(chan) | |
self.say(chan, 'Hi there!') | |
# CTCPs | |
if msg.startswith(chr(1)) and msg.endswith(chr(1)): | |
chunk = msg[1:-1] | |
# CTCP VERSION request | |
if chunk == 'VERSION': | |
self.notice(nick, '{0}VERSION Ouroboros IRC bot version 0.1{0}'.format(chr(1))) | |
# CTCP PING request | |
elif chunk.startswith('PING'): | |
pong = chunk[5:] | |
self.notice(nick, '{0}PING {1}{0}'.format(chr(1), pong)) | |
# Someone's talking to the bot | |
elif msg.startswith('{0}: '.format(self.config['nick'])): | |
subMsg = msg[len(self.config['nick']) + 2:] | |
if subMsg == 'trigcheck': | |
self.say(chan, '{0}: {1}'.format(nick, self.trigger)) | |
elif subMsg == 'stats': | |
self.say(chan, '') | |
# Commands | |
elif msg.startswith(self.trigger): | |
mBits = msg[len(self.trigger):].split(' ') | |
cmd = mBits[0] | |
if cmd == 'about': | |
self.say(chan, 'I\'m an Ouroboros IRC bot!') | |
elif cmd == 'ping': | |
self.pingSent = time() | |
self.say(chan, 'Pinging the server...') | |
self.send('PING :{0}'.format(self.pingSent)) | |
elif cmd == 'quit': | |
if isOwner: | |
self.say(chan, 'Alrighty! Good bye!') | |
self.quit('Bye bye!') | |
else: | |
self.denyAccess(chan, nick) | |
elif cmd == 'join' or cmd == 'part': | |
if isOwner: | |
if len(mBits) != 2: | |
self.say(chan, '{0}: usage: {1} #channel'.format(nick, cmd)) | |
else: | |
if cmd == 'join': | |
self.join(mBits[1]) | |
else: | |
self.part(mBits[1]) | |
else: | |
self.denyAccess(chan, nick) | |
# Are we in debug mode? | |
if self.debug: | |
# Print the raw IRC packet | |
self.output(self.buffer) | |
# Clear the buffer for the next run. | |
self.buffer = '' | |
# Bot's done running! | |
return | |
# Send a packet to the server | |
def send(self, pkt): | |
try: | |
self.socket.send(bytes(pkt + '\n', 'utf-8')) | |
except: | |
self.output('Error while sending:', str(sys.exc_info())) | |
self.running = False | |
return | |
# Attempt to join a room | |
def join(self, chan): | |
self.send('JOIN {0}'.format(chan)) | |
return | |
# Attempt to leave a room | |
def part(self, chan): | |
self.send('PART {0}'.format(chan)) | |
return | |
# Attempt to say something to a room | |
def say(self, chan, msg): | |
self.send('PRIVMSG {0} :{1}'.format(chan, msg)) | |
self.output('[{0}] <{1}> {2}'.format(chan, self.config['nick'], msg)) | |
return | |
# Attempt to say something to a room (Notice) | |
def notice(self, chan, msg): | |
self.send('NOTICE {0} :{1}'.format(chan, msg)) | |
self.output('[{0}] -{1}- {2}'.format(chan, self.config['nick'], msg)) | |
return | |
# Attempt to say something to a room, in action format. | |
def me(self, chan, msg): | |
self.send('PRIVMSG {0} :{2}ACTION {1}{2}'.format(chan, msg, chr(1))) | |
self.output('[{0}] * {1} {2}'.format(chan, self.config['nick'], msg)) | |
return | |
# Attempt to quit the server | |
def quit(self, msg): | |
# No reason? Default it | |
if len(msg) == 0: | |
msg = 'Closing bot.' | |
self.send('QUIT :{0}'.format(msg)) | |
return | |
# Parse out nick/user/host from :nick!user@host format. | |
def userData(self, chunk): | |
userData = chunk[1:] | |
splitterA = userData.find('!') | |
splitterB = userData.find('@') | |
nick = userData[0:splitterA] | |
user = userData[splitterA + 1:splitterB] | |
host = userData[splitterB + 1:] | |
# Return a tuple | |
return (nick, user, host) | |
# User doesn't have access to that command. | |
def denyAccess(self, chan, nick): | |
self.say(chan, '{0}: You don\'t have access to that!'.format(nick)) | |
return | |
# Output to console | |
def output(self, msg): | |
timestamp = datetime.now().strftime('[%H:%M:%S]') | |
print('{0} | {1}'.format(timestamp, msg)) | |
# Let's run the bot! | |
nub = Bot() | |
nub.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment