Created
August 23, 2018 02:46
-
-
Save thebluefish/41cb8c632b110d1aa0e629dee2a4c552 to your computer and use it in GitHub Desktop.
Basic discord.py-based logger wrapped in a class, runs in separate thread, provides thread-safe logging helpers, locks interactions to single whitelisted guild for security
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import discord | |
from discord.ext import commands | |
from threading import Thread | |
import asyncio | |
import sys | |
import traceback | |
import datetime | |
TOKEN = '' | |
# Expects the following to contain IDs pulled from discord | |
GUILD = | |
CHANNEL_GENERAL = | |
CHANNEL_ALERTS = | |
CHANNEL_INFO = | |
CHANNEL_DEBUG = | |
CHANNEL_ERROR = | |
def guild_auth(): | |
async def predicate(ctx): | |
return ctx.guild and ctx.guild.id == GUILD | |
return commands.check(predicate) | |
class Discord: | |
event_loop = asyncio.new_event_loop() | |
bot = commands.Bot('$', loop=event_loop) | |
def __init__(self): | |
# Events | |
self.on_ready = self.bot.event(self.on_ready) | |
self.on_error = self.bot.event(self.on_error) | |
self.on_guild_join = self.bot.event(self.on_guild_join) | |
# Commands | |
self.cmd_purge = self.bot.command(self.cmd_purge) | |
t = Thread(target=self.bot.run, args=(TOKEN,)) | |
t.start() | |
def __del__(self): | |
self.bot.close() | |
def send_general(self, message): | |
asyncio.run_coroutine_threadsafe(self.send_general_threaded(message), self.event_loop) | |
async def send_general_threaded(self, message): | |
await self.bot.wait_until_ready() | |
await self.bot.get_channel(CHANNEL_GENERAL).send(message) | |
def send_alerts(self, message): | |
asyncio.run_coroutine_threadsafe(self.send_alerts_threaded(message), self.event_loop) | |
async def send_alerts_threaded(self, message): | |
await self.bot.wait_until_ready() | |
await self.bot.get_channel(CHANNEL_ALERTS).send('@everyone\n{}'.format(message)) | |
def send_info(self, message): | |
asyncio.run_coroutine_threadsafe(self.send_info_threaded(message), self.event_loop) | |
async def send_info_threaded(self, message): | |
await self.bot.wait_until_ready() | |
await self.bot.get_channel(CHANNEL_INFO).send(message) | |
def send_debug(self, message): | |
asyncio.run_coroutine_threadsafe(self.send_debug_threaded(message), self.event_loop) | |
async def send_debug_threaded(self, message): | |
await self.bot.wait_until_ready() | |
await self.bot.get_channel(CHANNEL_DEBUG).send(message) | |
def send_error(self, message): | |
asyncio.run_coroutine_threadsafe(self.send_error_threaded(message), self.event_loop) | |
async def send_error_threaded(self, message): | |
await self.bot.wait_until_ready() | |
await self.bot.get_channel(CHANNEL_ERROR).send('@everyone\n{}'.format(message)) | |
async def on_error(self, event, *args, **kwargs): | |
exc_type, exc_value, exc_traceback = sys.exc_info() | |
embed = discord.Embed(title=':x: Event Error', colour=0xe74c3c) # Red | |
embed.add_field(name='Status', value='Aborting') | |
embed.description = '```py\n%s\n```' % exc_value | |
embed.timestamp = datetime.datetime.utcnow() | |
await self.bot.get_channel(CHANNEL_ERROR).send('@everyone', embed=embed) | |
traceback.print_exc() | |
sys.exit(1) | |
async def on_guild_join(self, guild): | |
raise PermissionError('!!! Bot was joined to {}:{}, ABORTING !!!'.format(guild.name, guild.id)) | |
async def on_ready(self): | |
await self.bot.wait_until_ready() | |
print('--Discord--') | |
print('Logged in as {0}'.format(self.bot.user.name)) | |
for g in self.bot.guilds: | |
if g.id != GUILD: | |
raise PermissionError('!!! Bot is joined to unknown channel, ABORTING !!!' | |
.format(g.name, g.id)) | |
print('Connected to {} - {}'.format(g.name, g.id)) | |
print('-----------') | |
@bot.command(pass_context=True, name='purge', no_pm=True) | |
@guild_auth() | |
async def cmd_purge(self, ctx, number=100): | |
limit = int(number) | |
if limit < 1: | |
await ctx.send('Really?') | |
return | |
if limit > 100: | |
await ctx.send('I can purge up to 100') | |
return | |
purged = await ctx.message.channel.purge(limit=limit) | |
self.send_info('Successfully purged {} messages in {}'.format(len(purged), ctx.message.channel.mention )) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment