Last active
May 19, 2020 14:01
-
-
Save isidentical/d5341f8c8d9e325c266917835aa08fe6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import inspect | |
import json | |
import re | |
import socket | |
import time | |
from collections import UserString | |
from dataclasses import asdict, dataclass, field | |
from typing import Any, Callable, Set, Tuple | |
TRUSTED = frozenset(("beginbotbot", "isidentical")) | |
COMMAND_RE = re.compile(r"(?P<command>![a-z]*)+") | |
ECONOMY_RE = re.compile( | |
r"Mana: (?P<mana>\d+) \| Street Cred: (?P<credit>\d+) \| Cool Points: (?P<points>\d+)" | |
) | |
@dataclass | |
class Item: | |
func: Callable | |
args: Tuple[Any, ...] | |
priority: int | |
@dataclass | |
class Message(UserString): | |
data: str | |
room: str | |
author: str | |
def __post_init__(self): | |
self.message = self.data | |
def infer_command(self): | |
args = self.data.split() | |
if len(args) > 1: | |
return args[1] | |
else: | |
return None | |
def infer_args(self): | |
args = self.data.split() | |
if len(args) > 2: | |
return args[2:] | |
else: | |
return [] | |
@dataclass | |
class Stats: | |
mana: int | |
credit: int | |
points: int | |
effects: Set[str] = field(default_factory=set) | |
class Client: | |
handlers = [] | |
def __init__( | |
self, nick, password, *, prefix="@", host="irc.twitch.tv", port=6667 | |
): | |
self.connection = socket.socket() | |
self.connection.connect((host, port)) | |
self.push_cmd("pass", password) | |
self.push_cmd("nick", nick) | |
self.nick = nick | |
self.prefix = prefix | |
self.inital = True | |
self.economy = None | |
self._economy_needs_me = True | |
self.stack = [] | |
@classmethod | |
def from_conf(cls, config_file): | |
with open(config_file) as file: | |
return cls(**json.load(file)) | |
@classmethod | |
def register(cls, *commands): | |
def wrapper(func): | |
func.commands = commands | |
cls.handlers.append(func) | |
return func | |
return wrapper | |
def poll(self): | |
buffer = str() | |
while self._economy_needs_me: | |
buffer += self.connection.recv(1024).decode("UTF-8") | |
*lines, buffer = re.split(r"[~\r\n]+", buffer) | |
for line in lines: | |
print(line) | |
self.process_single_line(line.strip()) | |
def process_single_line(self, line): | |
args = line.split() | |
if args[0] == "PING": | |
self.push_cmd("PONG", line[1]) | |
elif len(args) > 1 and args[1] == "PRIVMSG": | |
author = args.pop(0).split("!")[0][1:] | |
room, *message = args[1:] | |
message = " ".join(message)[1:] | |
self.dispatch_message(Message(message, room, author)) | |
else: | |
... | |
self.run_scheduler() | |
def run_scheduler(self): | |
for item in self.stack.copy(): | |
if item.priority != 0: | |
item.priority -= 1 | |
continue | |
self.stack.remove(item) | |
item.func(*item.args) | |
time.sleep(0.75) | |
def dispatch_message(self, message): | |
if message.author in TRUSTED and message.startswith( | |
f"@{self.nick} - " | |
): | |
self._try_parse_economy(message) | |
if self.inital and False: | |
# define inital commands | |
self.send_message(message.room, "!me") | |
self.inital = False | |
if message.startswith(self.prefix + self.nick): | |
command = message.infer_command() | |
args = message.infer_args() | |
for handler in self.handlers: | |
if command in handler.commands: | |
parameters = tuple( | |
inspect.signature(handler).parameters.keys() | |
) | |
if len(args) < len(parameters) - 2: | |
self.send_message( | |
message.room, | |
f"Missing argument '{parameters[len(args) + 2]}' for '{command}' command.", | |
) | |
else: | |
handler(self, message, *args[: len(parameters) - 2]) | |
def push_cmd(self, cmd, value): | |
request = f"{cmd.upper()} {value}\r\n" | |
self.connection.send(request.encode("utf8")) | |
def send_message(self, room, message, priority=0): | |
self.stack.append( | |
Item( | |
self.push_cmd, | |
("privmsg", f"{room} :{message}"), | |
priority=priority, | |
) | |
) | |
def _try_parse_economy(self, message): | |
if match := ECONOMY_RE.search(str(message)): | |
self.economy = Stats(**match.groupdict()) | |
if self.economy is not None: | |
for sound in COMMAND_RE.finditer(str(message)): | |
self.economy.effects.add(sound["command"]) | |
@Client.register("help") | |
def on_help(self, message): | |
self.send_message( | |
message.room, | |
f"My commands always startswith with " | |
f"'{self.prefix}{self.nick}' prefix. " | |
f"Available commands: 'help', 'stats'" | |
f", 'play', 'share', 'buy', 'props'", | |
) | |
@Client.register("stats") | |
def on_stats(self, message): | |
if self.economy is None: | |
self.stack.append(Item(on_stats, (self, message), priority=4)) | |
return self.send_message(message.room, "!me") | |
self.send_message( | |
message.room, | |
", ".join( | |
f"{key}: {value}" for key, value in asdict(self.economy).items() | |
), | |
) | |
self.send_message( | |
message.room, "!me", priority=100 | |
) # update stats on every 100 messages | |
@Client.register("play") | |
def on_play(self, message, music): | |
self.send_message(message.room, f"!{music}") | |
@Client.register("share") | |
def on_share(self, message, person, music): | |
self.send_message(message.room, f"!share {person} {music}") | |
@Client.register("buy") | |
def on_buy(self, message, command): | |
self.send_message(message.room, f"!buy {command}") | |
@Client.register("props", "bigups") | |
def on_props(self, message, person): | |
self.send_message(message.room, f"!{message.infer_command()} {person}") | |
if __name__ == "__main__": | |
client = Client.from_conf("../configs/stallmansbot.json") | |
for channel in ("beginbot", "isidentical"): | |
client.push_cmd("join", f"#{channel}") | |
client.poll() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment