Skip to content

Instantly share code, notes, and snippets.

@awt
Created September 4, 2023 03:22
Show Gist options
  • Save awt/10a28f189f6c233dba3685f4b451f1cd to your computer and use it in GitHub Desktop.
Save awt/10a28f189f6c233dba3685f4b451f1cd to your computer and use it in GitHub Desktop.
#dispatch.py
import logging
import discord
from discord.ext import tasks
logger = logging.Logger(name="dispatch")
PEST_CHANNEL_ID=1071135828757196813
class Dispatch(discord.Client):
def __init__(self, akris_client, intents=discord.Intents.default()):
intents.message_content = True
super().__init__(intents=intents)
self.akris_client = akris_client
async def setup_hook(self) -> None:
self.check_akris_client_queue.start()
async def on_ready(self):
print(f"Logged on as {self.user}")
# self.akris_client.send_command({"command": "page_up", "args": [time.time(), 1]})
async def on_message(self, message):
if message.author == self.user:
return
annotated_message_text = f"({message.author}) {message.clean_content}"
if len(message.clean_content) > 0:
self.akris_client.send_command({"command": "broadcast_text", "args": [annotated_message_text]})
print(annotated_message_text)
@tasks.loop(seconds=1) # task runs every 60 seconds
async def check_akris_client_queue(self):
channel = self.get_channel(PEST_CHANNEL_ID) # channel ID goes here
# check the queue and perform actions if needed
if not self.akris_client.message_queue.empty():
while not self.akris_client.message_queue.empty():
message = self.akris_client.message_queue.get()
if message.get("command") in ["broadcast_text"]:
if not message.get("speaker") == "discord_bridge":
await channel.send(f"**{message.get('speaker')}**: {message.get('body')}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment