Skip to content

Instantly share code, notes, and snippets.

@flagoworld
Last active April 13, 2016 05:50
Show Gist options
  • Save flagoworld/b151b09be4f11acc1a90f990f3b74fea to your computer and use it in GitHub Desktop.
Save flagoworld/b151b09be4f11acc1a90f990f3b74fea to your computer and use it in GitHub Desktop.
import os
from threading import Thread
from time import sleep
import asyncio
FIFO_PATH = "/path/to/crash-log"
class FifoReader(Thread):
def __init__(self, path, crash):
super().__init__()
self.daemon = True
self.crash = crash
self.path = path
try:
os.remove(path)
except OSError:
pass
oldmask = os.umask(0o000)
os.mkfifo(path, 0o666)
os.umask(oldmask)
def run(self):
fifo = open(self.path, "r")
while self.crash == self.crash.bot.get_cog("Crash"):
line = fifo.read()
if not line:
sleep(1)
else:
self.crash.queue.put_nowait(line)
fifo.close()
class Crash:
def __init__(self, bot):
self.bot = bot
self.queue = asyncio.Queue(maxsize=0)
self.crash_reader = FifoReader(FIFO_PATH, self)
self.crash_reader.start()
async def empty_queue(self):
while self == self.bot.get_cog("Crash"):
line = await self.queue.get()
channel = next((c for c in self.bot.get_all_channels() if c.id == '134015872552534016'), None)
if channel:
await self.bot.send_message(channel, line)
def setup(bot):
n = Crash(bot)
loop = asyncio.get_event_loop()
loop.create_task(n.empty_queue())
bot.add_cog(n)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment