Created
January 6, 2011 17:13
-
-
Save mdeous/768200 to your computer and use it in GitHub Desktop.
Asynchronous IRC library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
#TODO: implement commands: oper, mode, topic, names, list, invite | |
#TODO: '' '' : kick, who, whois, whowas, kill, away | |
#TODO: '' '' : rehash, restart, wallops, | |
import logging | |
from asynchat import async_chat | |
from datetime import datetime | |
from re import compile as re_compile | |
from socket import AF_INET, SOCK_STREAM | |
LOGFORMAT = '[%(name)s] %(levelname)s - %(message)s' | |
CTCP_RE = re_compile(r'^\x01(?P<type>[A-Z]+)(?: (?P<data>.+))?\x01$') | |
class IRCClient(async_chat): | |
""" | |
Asynchronous IRC client. | |
""" | |
terminator = '\r\n' | |
ctcp_version = None | |
def __init__(self, host, port, nickname, username, | |
channels=None, | |
loglevel=logging.INFO, | |
logformat=LOGFORMAT): | |
async_chat.__init__(self) | |
logging.basicConfig(format=logformat) | |
self.host = host | |
self.port = port | |
self.nickname = nickname | |
self.username = username | |
self.channels = [] if (channels is None) else channels | |
self.received_data = '' | |
self.logger = logging.getLogger('%(host)s:%(port)d' % {'host': host, 'port': port}) | |
self.logger.setLevel(loglevel) | |
self.logger.info('connecting to host...') | |
self.create_socket(AF_INET, SOCK_STREAM) | |
self.connect((host, port)) | |
def _connection_made(self): | |
self.logger.info('connection successful') | |
for channel in self.channels: | |
self.send_data('JOIN %(chan)s' % {'chan': channel}) | |
self.on_connection() | |
def split_netmask(self, netmask): | |
nick = netmask.split('!')[0].lstrip(':') | |
user = netmask.split('@')[0].split('!')[1] | |
host = netmask.split('@')[1] | |
return (nick, user, host) | |
def send_data(self, data): | |
self.logger.debug('SENT DATA: %(data)s' % {'data': repr(data)}) | |
data += '\r\n' | |
self.push(data) | |
def handle_connect(self): | |
self.send_data('NICK :%(nick)s' % {'nick': self.nickname}) | |
self.send_data('USER %(nick)s %(nick)s %(nick)s :%(user)s' % { | |
'nick': self.nickname, | |
'user': self.username, | |
}) | |
def handle_data(self, data): | |
self.logger.debug(str(data)) | |
token = data.split() | |
src = token[0] | |
if 'PRIVMSG' in token: | |
dst = token[2] | |
message = ' '.join(token[3:]).lstrip(':') | |
ctcp = CTCP_RE.match(message) | |
if ctcp is not None: | |
ctcp = ctcp.groupdict() | |
self.on_ctcp(src, dst, ctcp) | |
return | |
self.on_privmsg(src, dst, message) | |
elif 'NOTICE' in token: | |
dst = token[2] | |
message = ' '.join(token[3:]).lstrip(':') | |
self.on_notice(src, dst, message) | |
elif 'JOIN' in token: | |
dst = token[2] | |
self.on_join(src, dst) | |
elif 'PART' in token: | |
dst = token[2] | |
message = ' '.join(token[3:]).lstrip(':') if (len(token) > 2) else None | |
self.on_part(src, dst, message) | |
elif 'PING' in token: | |
self.on_ping(token[1]) | |
elif 376 in token: | |
self._connection_made() | |
else: | |
self.on_unknown_data(data) | |
def found_terminator(self): | |
self.handle_data(self.received_data) | |
self.received_data = '' | |
def collect_incoming_data(self, data): | |
self.received_data += data | |
########################## | |
## IRC COMMANDS METHODS ## | |
########################## | |
def set_nick(self, new_nick): | |
self.nickname = new_nick | |
self.send_data('NICK %(nick)s' % {'nick': new_nick}) | |
def privmsg(self, dst, msg, color=None): | |
self.send_data('PRIVMSG %(dst)s :%(msg)s' % { | |
'dst': dst, | |
'msg': msg, | |
}) | |
def notice(self, dst, msg): | |
self.send_data('NOTICE %(dst)s :%(msg)s' % { | |
'dst': dst, | |
'msg': msg, | |
}) | |
def join(self, *dst): | |
for channel in dst: | |
self.send_data('JOIN %(dst)s' % {'dst': channel}) | |
def part(self, dst, msg=None): | |
data = 'PART %(dst)s' % {'dst': dst} | |
if msg is not None: | |
data += ' :%(msg)s' % {'msg': msg} | |
self.send_data(data) | |
def quit(self, msg=None): | |
data = 'QUIT' | |
if msg is not None: | |
data += ' :%(msg)s' % {'msg': msg} | |
self.send_data(data) | |
def on_ping(self, ping_id): | |
self.send_data('PONG %(id)s' % {'id': ping_id}) | |
def on_ctcp(self, src, dst, ctcp): | |
if (ctcp['type'] == 'PING') and (ctcp['data'] is not None): | |
self.on_ctcp_ping(src, ctcp) | |
elif ctcp['type'] == 'TIME': | |
self.on_ctcp_time(src) | |
elif ctcp['type'] == 'VERSION': | |
self.on_ctcp_version(src) | |
elif ctcp['type'] == 'FINGER': | |
self.on_ctcp_finger(src) | |
elif ctcp['type'] == 'ACTION': | |
self.on_action(src, dst, ctcp['data']) | |
##################################################### | |
## EVENTS HANDLERS TO (RE-)IMPLEMENT IN A SUBCLASS ## | |
##################################################### | |
def on_unknown_data(self, data): | |
pass | |
def on_connection(self): | |
pass | |
def on_ctcp_ping(self, src, ctcp): | |
nick, _, _ = self.split_netmask(src) | |
self.notice(nick, '\x01PING %(timestamp)s\x01' % {'timestamp': ctcp['data']}) | |
def on_ctcp_time(self, src): | |
nick, _, _ = self.split_netmask(src) | |
message = '%s UTC' % datetime.utcnow() | |
self.notice(nick, '\x01TIME :%(time)s' % {'time': message}) | |
def on_ctcp_version(self, src): | |
pass | |
def on_ctcp_finger(self, src): | |
pass | |
def on_action(self, src, dst, msg): | |
pass | |
def on_privmsg(self, src, dst, msg): | |
pass | |
def on_notice(self, src, dst, msg): | |
pass | |
def on_join(self, src, dst): | |
pass | |
def on_part(self, src, dst, msg): | |
pass | |
def on_quit(self, src, msg): | |
pass | |
if __name__ == '__main__': | |
from asyncore import loop | |
host, port = 'irc.plain-text.info', 6667 | |
nick = username = 'fatbotslim' | |
channels = ['#testbot'] | |
loglevel = logging.DEBUG | |
client = IRCClient(host, port, nick, username, channels, loglevel) | |
loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment