Skip to content

Instantly share code, notes, and snippets.

@PurpleMyst
Created July 31, 2017 15:48
Show Gist options
  • Save PurpleMyst/c5fa16e62e0831403fa5c4f7a99f4c88 to your computer and use it in GitHub Desktop.
Save PurpleMyst/c5fa16e62e0831403fa5c4f7a99f4c88 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import collections
import inspect
import curio
from curio import socket
User = collections.namedtuple("User", ["nick", "user", "host"])
Server = collections.namedtuple("Server", ["name"])
Message = collections.namedtuple("Message", ["sender", "command", "args"])
ANY_ARGUMENTS = -1 # any amount of arguments, fully split
NO_SPLITTING = -2 # any amount of arguments, no splitting
ALWAYS_CALLBACK_PRIVMSG = False
def _create_callback_registration(key):
def _inner(self, func):
if not inspect.iscoroutinefunction(func):
raise ValueError("You can only register coroutines!")
self._message_callbacks[key].append(func)
return func
return _inner
class IrcBot:
def __init__(self, encoding="utf-8"):
self.nick = None
self._server = None
self.encoding = encoding
self._linebuffer = collections.deque()
self._sock = socket.socket()
self._connection_callbacks = []
self._message_callbacks = {"PRIVMSG": [], "JOIN": [], "PART": []}
self._command_callbacks = {}
async def _send(self, *parts):
data = " ".join(parts).encode(self.encoding) + b"\r\n"
await self._sock.sendall(data)
async def _recv_line(self):
if not self._linebuffer:
data = bytearray()
while not data.endswith(b"\r\n"):
chunk = await self._sock.recv(4096)
if chunk:
data += chunk
else:
raise IOError("Server closed the connection!")
try:
lines = data.decode(self.encoding).split("\r\n")
except UnicodeDecodeError:
# Arguably we could avoid crashing here, but something might be
# screwed up if we get to this point. It's pretty unlikely that
# IRC sends non-UTF8 data.
raise RuntimeError("Could not decode chunk: {data}") from None
else:
self._linebuffer.extend(lines)
return self._linebuffer.popleft()
@staticmethod
def _split_line(line):
if line.startswith(":"):
sender, command, *args = line.split(" ")
sender = sender[1:]
if "!" in sender:
nick, sender = sender.split("!", 1)
user, host = sender.split("@", 1)
sender = User(nick, user, host)
else:
sender = Server(sender)
else:
sender = None
command, *args = line.split(" ")
for n, arg in enumerate(args):
if arg.startswith(":"):
temp = args[:n]
temp.append(" ".join(args[n:])[1:])
args = temp
break
return Message(sender, command, args)
async def connect(self, nick, host, port=6667):
self.nick = nick
self._server = (host, port)
await self._sock.connect(self._server)
await self._send("NICK", self.nick)
await self._send("USER", self.nick, "0", "*", ":" + self.nick)
while True:
line = await self._recv_line()
if line.startswith("PING"):
await self._send(line.replace("PING", "PONG", 1))
continue
msg = self._split_line(line)
if msg.command == "001":
break
async with curio.TaskGroup() as g:
for callback in self._connection_callbacks:
await g.spawn(callback(self))
await g.join()
async def join_channel(self, channel):
await self._send("JOIN", channel)
async def send_privmsg(self, recipient, text):
await self._send("PRIVMSG", recipient, ":" + text)
async def mainloop(self):
while True:
line = await self._recv_line()
if not line:
continue
if line.startswith("PING"):
await self._send(line.replace("PING", "PONG", 1))
continue
msg = self._split_line(line)
callbacks = self._message_callbacks.get(msg.command, ())
async with curio.TaskGroup() as g:
spawn_callbacks = True
if msg.command == "PRIVMSG":
command, *args = msg.args[1].strip().split(" ")
callbacks = self._command_callbacks.get(command, ())
for callback, arg_amount in callbacks:
if arg_amount == NO_SPLITTING:
spawn_callbacks = False
coro = callback(self, msg.sender, msg.args[0],
" ".join(args))
await g.spawn(coro)
elif arg_amount == ANY_ARGUMENTS or \
len(args) == arg_amount:
spawn_callbacks = False
coro = callback(self, msg.sender, msg.args[0],
*args)
await g.spawn(coro)
if ALWAYS_CALLBACK_PRIVMSG or spawn_callbacks:
# Sometimes we don't want to spawn the PRIVMSG callbacks if
# this is a command.
for callback in callbacks:
await g.spawn(callback(self, msg.sender, *msg.args))
await g.join()
def on_connect(self, func):
if not inspect.iscoroutinefunction(func):
raise ValueError("You can only register coroutines!")
self._connection_callbacks.append(func)
on_privmsg = _create_callback_registration("PRIVMSG")
on_join = _create_callback_registration("JOIN")
on_part = _create_callback_registration("PART")
def on_command(self, command, arg_amount=ANY_ARGUMENTS):
def _inner(func):
if not inspect.iscoroutinefunction(func):
raise ValueError("You can only register coroutines!")
if command not in self._command_callbacks:
self._command_callbacks[command] = []
self._command_callbacks[command].append((func, arg_amount))
return func
return _inner
#!/usr/bin/env python3
import collections
import curio
from ircbot import IrcBot, NO_SPLITTING
bot = IrcBot()
@bot.on_connect
async def onconnect(_):
print("Connected!")
@bot.on_privmsg
async def onprivmsg(_, sender, recipient, text):
print("<{} to {}> {}".format(sender.nick, recipient, text))
@bot.on_join
async def onjoin(_, sender, channel):
print(sender.nick, "joined", channel)
@bot.on_part
async def onpart(_, sender, channel):
print(sender.nick, "parted", channel)
@bot.on_command("!echo", NO_SPLITTING)
async def echo(self, sender, recipient, text):
if recipient == self.nick:
recipient = sender.nick
await self.send_privmsg(recipient, sender.nick + ": " + text)
@bot.on_command("!add", 2)
async def add(self, sender, recipient, a, b):
if recipient == self.nick:
recipient = sender.nick
try:
a, b = int(a), int(b)
except ValueError:
pass
else:
await self.send_privmsg(recipient, sender.nick + ": " + str(a + b))
@bot.on_command("bf>", NO_SPLITTING)
async def bf(self, sender, recipient, code):
# This brainfuck code is a bit horrible, but it was written as a POC and
# it was written as fast as possible.
stack = collections.defaultdict(int)
ptr = 0
i = 0
output = bytearray()
while i < len(code):
token = code[i]
if token == "+":
stack[ptr] += 1
elif token == "-":
stack[ptr] -= 1
elif token == ">":
ptr += 1
elif token == "<":
ptr -= 1
elif token == ".":
output.append(stack[ptr])
elif token in "[]":
if token == "[":
indexes = range(i, len(code))
else:
indexes = range(i, -1, -1)
depth = 0
for j in indexes:
if code[j] == "[":
depth += +1 if token == "[" else -1
elif code[j] == "]":
depth += -1 if token == "[" else +1
if depth == 0:
break
assert depth == 0
if token == "[" and stack[ptr] == 0:
i = j
elif stack[ptr] != 0:
i = j
i += 1
if recipient == self.nick:
recipient = sender.nick
await self.send_privmsg(recipient,
sender.nick + ": " + output.decode("ascii"))
# Just to any and all linters to shut up about unused variables.
del onconnect, onprivmsg, onjoin, onpart, echo, add, bf
async def main():
await bot.connect("BananaPy", "chat.freenode.net")
await bot.join_channel("#8banana-bottest")
await bot.mainloop()
if __name__ == "__main__":
curio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment