Skip to content

Instantly share code, notes, and snippets.

@nwunderly
Created March 14, 2021 05:51
Show Gist options
  • Save nwunderly/adba0d46764ca86eabff3885698b0e0b to your computer and use it in GitHub Desktop.
Save nwunderly/adba0d46764ca86eabff3885698b0e0b to your computer and use it in GitHub Desktop.
Shitty chat bot code using ChatterBot and trained on a Discord channel
"""
This file runs two discord bots, which talk to each other after one of them is prompted by a user.
WARNING: This code kinda sucks
"""
import asyncio
import logging
import sys
from discord.ext import commands
from discord.client import _cleanup_loop
from chatterbot import ChatBot
from chatterbot.trainers import ChatterBotCorpusTrainer
logger = logging.getLogger('chatbots')
logger.setLevel(logging.INFO)
h = logging.StreamHandler(sys.stdout)
h.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(h)
###############
# CONFIG SHIT #
###############
# info about each of the two bots to run
# I would NOT recommend trying to add more than two, it would probably break responses
# (they're currently set up to treat "the first bot in BOTS that isn't me" as the only bot to respond to)
BOTS = [
{
'name': 'clippy',
'prefix': '//1',
'bot_id': 000000000000000000,
'token': ''
},
{
'name': 'honk',
'prefix': '//2',
'bot_id': 000000000000000000,
'token': ''
}
]
# Channel the bots will be restricted to so they don't start freaking out
CHANNEL = 000000000000000000
############
# BOT SHIT #
############
class Bot(commands.Bot):
"""Custom discord bot class to handle ChatterBot stuff internally."""
def __init__(self, prefix, bot_id, token, name, **kwargs):
super().__init__(command_prefix=prefix, **kwargs)
self.bot_id = bot_id
self.__token = token
self.logger = logging.getLogger(f'chatbots.{name}')
self.other_bot_id = 0
for b in BOTS:
if b['bot_id'] != bot_id:
self.other_bot_id = b['bot_id']
break
self.add_cog(Commands(self))
self.chatbot = ChatBot(name)
async def on_ready(self):
self.logger.info(f"Logged in as {self.user}")
async def start(self):
await super().start(self.__token)
async def on_message(self, message):
if message.author == self.user or message.channel.id != CHANNEL:
return
elif message.author.id == self.other_bot_id:
await asyncio.sleep(10)
await self.respond_to_message(message.content, message.channel)
else:
await self.process_commands(message)
async def respond_to_message(self, content, channel):
self.logger.info(f"responding to message: '{content}'")
response = self.chatbot.get_response(content)
self.logger.info(f"response: '{response}'")
await channel.send(response)
class Commands(commands.Cog):
"""Commands for the bot, put in a self-contained cog for ease of use."""
def __init__(self, bot):
self.bot = bot
@commands.command()
async def start(self, ctx, *, message):
"""Starts conversation based on a given message."""
await self.bot.respond_to_message(message, ctx.channel)
@commands.command()
async def die(self):
"""Stops both bots. Failsafe in case they accidentally become sentient."""
self.bot.logger.info('KILLING')
exit()
#####################
# ACTUALLY RUN SHIT #
#####################
# tbh you can (and probably should) just ignore this, it's not pretty but it works
def main():
"""This basically just handles running two bots in one event loop."""
logger.info('Initializing')
loop = asyncio.get_event_loop()
bots = [Bot(**b, loop=loop) for b in BOTS]
async def runner():
try:
logger.info('Gathering tasks')
await asyncio.gather(*[bot.start() for bot in bots])
finally:
pass
def stop_loop_on_completion(_):
loop.stop()
logger.info('Starting up')
future = asyncio.ensure_future(runner(), loop=loop)
future.add_done_callback(stop_loop_on_completion)
try:
loop.run_forever()
except KeyboardInterrupt:
logger.info('Received signal to terminate bot and event loop.')
finally:
future.remove_done_callback(stop_loop_on_completion)
logger.info('Cleaning up tasks.')
_cleanup_loop(loop)
logger.info('Done.')
if __name__ == '__main__':
main()
discord.py
chatterbot
chatterbot_corpus
"""
Script to train the bots on ChatterBot's default dataset.
Run this before starting the bots to set up the sqlite database they'll use.
"""
from chatterbot import ChatBot
from chatterbot.trainers import ChatterBotCorpusTrainer, UbuntuCorpusTrainer
print("Training")
chatbot = ChatBot('Steve')
trainer = ChatterBotCorpusTrainer(chatbot)
trainer.train('chatterbot.corpus.english')
# trainer2 = UbuntuCorpusTrainer(chatbot).train()
# trainer2.train()
print("Done.")
"""
Script to train the bots on a Discord channel.
This treats every message in the channel as a response to the previous.
WARNING: This is NOT good code, and in fact involves downloading the entire
message history of a Discord channel. Using this is frankly just a bad idea.
"""
import os
import traceback
import discord
from discord.ext import commands
from chatterbot import ChatBot
from chatterbot.conversation import Statement
OWNER = 000000000000000000 # your ID
TOKEN = '' # the bot token
SOURCE_CHANNEL = 000000000000000000 # channel to train on
OUTPUT_CHANNEL = 000000000000000000 # channel to report progress in
ALERT_FREQ = 300000 # how many messages between training update messages
chatbot = ChatBot("Steve") # arbitrary name again
bot = commands.Bot(command_prefix="===")
bot.add_check(lambda ctx: ctx.author.id == OWNER)
class cache:
"""The chad class singleton"""
running = False
count = 0
messages = (None, None) # (message, prev_message)
async def train_message(message, prev_message):
if not (message and prev_message):
return
chatbot.learn_response(message, prev_message)
cache.count += 1
await send_alert(cache.count)
async def send_alert(count, force=False):
if count % ALERT_FREQ == 0 or force:
await bot.get_channel(OUTPUT_CHANNEL).send(f"<@{OWNER}> status: {count} messages")
async def channel_history():
channel = bot.get_channel(SOURCE_CHANNEL)
async for message in channel.history(limit=None):
if message.content and (not message.author.bot):
statement = Statement(message.content, in_response_to=cache.messages[0])
cache.messages = (statement, cache.messages[0])
print(f"Training:\t{cache.count}\t{cache.messages}")
await train_message(*cache.messages)
await send_alert(cache.count, True)
await bot.get_channel(OUTPUT_CHANNEL).send("Done.")
@bot.event
async def on_ready():
if not cache.running:
cache.running = True
await bot.get_channel(OUTPUT_CHANNEL).send("Welp, here goes nothing")
try:
await channel_history()
except Exception as e:
traceback.print_exception(e.__class__, e, e.__traceback__)
await bot.get_channel(OUTPUT_CHANNEL).send(f"Failed with {e.__class__.__name__}, {e}")
@bot.command()
async def status(ctx):
await ctx.send(f"{cache.count}")
if __name__ == '__main__':
bot.run(TOKEN)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment