Skip to content

Instantly share code, notes, and snippets.

@kaecy
Last active December 12, 2021 13:03
Show Gist options
  • Save kaecy/057669eea0fdf76fc2b4f7b83b49147f to your computer and use it in GitHub Desktop.
Save kaecy/057669eea0fdf76fc2b4f7b83b49147f to your computer and use it in GitHub Desktop.
IRC library in python in development.
from socket import socket
from threading import Thread
from message import MessageParser, MessageBuffer
class Socket(socket):
def __init__(self, address, port):
socket.__init__(self)
self.connect((address, port))
class IRCMessageLoop(Thread):
def __init__(self, address, port):
Thread.__init__(self)
self.name = None
self.channel = {}
self.connected = False
self.socket = Socket(address, port)
def log(self, msg):
print("Raw", msg)
def processMessage(self, msg):
if msg.type == "001":
self.connected = True
for channel in self.channel.keys():
self.join(channel)
if msg.type == "privmsg" and msg.target.startswith("#"):
self.channel[msg.target].append(msg)
if msg.type == "ping":
self.pong(msg.content)
def run(self):
socket = self.socket
buffer = MessageBuffer()
if self.name:
self.nick(self.name)
self.user(self.name)
else:
print("Nick not set")
return
while True:
buffer.append(socket.recv(512).decode())
while buffer.hasCompleteMessage():
ircMessage = buffer.getNextMessage()
ircMessage = ircMessage
msg = MessageParser.message(ircMessage)
self.log(msg)
self.processMessage(msg)
def send(self, type, content):
s = type + " " + content
self.socket.send(s.encode() + b'\r\n')
def msg(self, chan, text):
msgType = "PRIVMSG"
msgContent = chan + " :" + text
sMsg = ":" + self.name + " " + msgType + " " + msgContent
if chan in self.channel.keys():
self.channel[chan].append(MessageParser.message(sMsg))
self.send(msgType, msgContent)
# NICK <nickname> [<hopcount>] (RFC 1459)
def nick(self, name):
self.name = name
self.send("NICK", name)
# USER <username> <hostname> <servername> <realname> (RFC 1459)
def user(self, user):
self.send("USER", user + " Unknown Unknown :Unknown")
# JOIN <channels> [<keys>] (RFC 1459)
def join(self, channel):
if channel not in self.channel:
self.channel[channel] = IRCLog()
if self.connected:
self.send("JOIN", channel)
# PONG <server1> [<server2>] (RFC 1459)
def pong(self, server):
if self.connected:
self.send("PONG", server)
class IRCLog:
def __init__(self, max=24):
self.messages = []
self.max = max
def append(self, msg):
self.messages.append(msg)
if len(self.messages) > self.max:
self.messages = self.messages[1: ]
# Prefix: origin, sender
# Message: type, target, content/param
class Message:
def __init__(self):
self.origin = None
self.type = None
self.content = None
class User:
def __init__(self):
self.name = "Unknown"
self.user = "Unknown"
self.host = "Unknown"
# Parses IRC string messages and returns Message objects to make it easier to handle messages.
# Returns Message.
class MessageParser:
@staticmethod
def message(ircMessage):
message = Message();
message.origin = User()
# Extract prefix.
# Messages beginning with ':' signify they contain a prefix: <servername> | <nick> [ '!' <user> ] [ '@' <host> ].
if ircMessage.startswith(":"):
spIndex = ircMessage.find(' ')
if spIndex > -1:
origin = ircMessage[1: spIndex]
ircMessage = ircMessage[spIndex + 1:]
else:
origin = ircMessage[1:]
ircMessage = ""
uIndex = origin.find('!')
hIndex = origin.find('@')
if uIndex > -1:
message.origin.name = origin[0: uIndex]
origin = origin[uIndex + 1: ]
if hIndex > -1:
message.origin.user, message.origin.host = origin.split("@")
else:
message.origin.user = origin
else:
if hIndex > -1:
message.origin.name, message.origin.host = origin.split("@")
else:
message.origin.name = origin
if ircMessage == "":
return message
# Extract message.
spIndex = ircMessage.find(' ')
if spIndex != -1:
message.type = ircMessage[0: spIndex].lower()
message.content = ircMessage[spIndex + 1:]
if message.content.startswith(":"):
message.content = message.content[1:]
else:
message.type = ircMessage
# Special parsing.
if message.type == "join" or message.type == "part":
message.content = message.content.split(",")
if message.type == "privmsg":
spIndex = message.content.find(' ')
if spIndex != -1:
message.target = message.content[: spIndex]
message.content = message.content[spIndex + 1:]
if message.content.startswith(":"):
message.content = message.content[1: ]
return message
class MessageBuffer:
def __init__(self):
self.buffer = ""
def append(self, bytes):
self.buffer += bytes
def hasCompleteMessage(self):
if self.buffer.find("\r\n") != -1:
return True
else:
return False
def getNextMessage(self):
index = self.buffer.find("\r\n")
message = "";
if index > -1:
message = self.buffer[0: index]
self.buffer = self.buffer[index + 2: ]
return message;
if __name__ == "__main__":
# Testing code.
msg = MessageParser.message(":kaecy!dracula@castle PRIVMSG #shadows :Give me your soul.")
if msg:
mapdata = vars(msg)
for key, value in mapdata.items():
if key == "origin":
print("origin:")
for key, value in vars(value).items():
print(" ", key, value)
else:
print(key + ":", repr(value))
else:
print("Invalid message.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment