Skip to content

Instantly share code, notes, and snippets.

@thebluefish
Created August 23, 2018 02:46
Show Gist options
  • Save thebluefish/41cb8c632b110d1aa0e629dee2a4c552 to your computer and use it in GitHub Desktop.
Save thebluefish/41cb8c632b110d1aa0e629dee2a4c552 to your computer and use it in GitHub Desktop.
Basic discord.py-based logger wrapped in a class, runs in separate thread, provides thread-safe logging helpers, locks interactions to single whitelisted guild for security
import discord
from discord.ext import commands
from threading import Thread
import asyncio
import sys
import traceback
import datetime
TOKEN = ''
# Expects the following to contain IDs pulled from discord
GUILD =
CHANNEL_GENERAL =
CHANNEL_ALERTS =
CHANNEL_INFO =
CHANNEL_DEBUG =
CHANNEL_ERROR =
def guild_auth():
async def predicate(ctx):
return ctx.guild and ctx.guild.id == GUILD
return commands.check(predicate)
class Discord:
event_loop = asyncio.new_event_loop()
bot = commands.Bot('$', loop=event_loop)
def __init__(self):
# Events
self.on_ready = self.bot.event(self.on_ready)
self.on_error = self.bot.event(self.on_error)
self.on_guild_join = self.bot.event(self.on_guild_join)
# Commands
self.cmd_purge = self.bot.command(self.cmd_purge)
t = Thread(target=self.bot.run, args=(TOKEN,))
t.start()
def __del__(self):
self.bot.close()
def send_general(self, message):
asyncio.run_coroutine_threadsafe(self.send_general_threaded(message), self.event_loop)
async def send_general_threaded(self, message):
await self.bot.wait_until_ready()
await self.bot.get_channel(CHANNEL_GENERAL).send(message)
def send_alerts(self, message):
asyncio.run_coroutine_threadsafe(self.send_alerts_threaded(message), self.event_loop)
async def send_alerts_threaded(self, message):
await self.bot.wait_until_ready()
await self.bot.get_channel(CHANNEL_ALERTS).send('@everyone\n{}'.format(message))
def send_info(self, message):
asyncio.run_coroutine_threadsafe(self.send_info_threaded(message), self.event_loop)
async def send_info_threaded(self, message):
await self.bot.wait_until_ready()
await self.bot.get_channel(CHANNEL_INFO).send(message)
def send_debug(self, message):
asyncio.run_coroutine_threadsafe(self.send_debug_threaded(message), self.event_loop)
async def send_debug_threaded(self, message):
await self.bot.wait_until_ready()
await self.bot.get_channel(CHANNEL_DEBUG).send(message)
def send_error(self, message):
asyncio.run_coroutine_threadsafe(self.send_error_threaded(message), self.event_loop)
async def send_error_threaded(self, message):
await self.bot.wait_until_ready()
await self.bot.get_channel(CHANNEL_ERROR).send('@everyone\n{}'.format(message))
async def on_error(self, event, *args, **kwargs):
exc_type, exc_value, exc_traceback = sys.exc_info()
embed = discord.Embed(title=':x: Event Error', colour=0xe74c3c) # Red
embed.add_field(name='Status', value='Aborting')
embed.description = '```py\n%s\n```' % exc_value
embed.timestamp = datetime.datetime.utcnow()
await self.bot.get_channel(CHANNEL_ERROR).send('@everyone', embed=embed)
traceback.print_exc()
sys.exit(1)
async def on_guild_join(self, guild):
raise PermissionError('!!! Bot was joined to {}:{}, ABORTING !!!'.format(guild.name, guild.id))
async def on_ready(self):
await self.bot.wait_until_ready()
print('--Discord--')
print('Logged in as {0}'.format(self.bot.user.name))
for g in self.bot.guilds:
if g.id != GUILD:
raise PermissionError('!!! Bot is joined to unknown channel, ABORTING !!!'
.format(g.name, g.id))
print('Connected to {} - {}'.format(g.name, g.id))
print('-----------')
@bot.command(pass_context=True, name='purge', no_pm=True)
@guild_auth()
async def cmd_purge(self, ctx, number=100):
limit = int(number)
if limit < 1:
await ctx.send('Really?')
return
if limit > 100:
await ctx.send('I can purge up to 100')
return
purged = await ctx.message.channel.purge(limit=limit)
self.send_info('Successfully purged {} messages in {}'.format(len(purged), ctx.message.channel.mention ))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment