Skip to content

Instantly share code, notes, and snippets.

@trevorflahardy
Last active September 9, 2023 13:04
Show Gist options
  • Save trevorflahardy/705f1d3e4ca47c5d06c244082fcd85f5 to your computer and use it in GitHub Desktop.
Save trevorflahardy/705f1d3e4ca47c5d06c244082fcd85f5 to your computer and use it in GitHub Desktop.
Cog metaclass example for discord.py
from discord.ext import commands, tasks
class MetaTask(commands.CogMeta):
"""
A simple Metclass that can be used to get all tasks.Loop from the class,
and cancel them easily.
"""
def __new__(cls, name, bases, attrs, **kwargs):
new_cls = super().__new__(cls, name, bases, attrs)
_inner_tasks = [] # We'll attach any tasks we find to this
# Go through each method in the class.
# This is essentially the same as seeking through dir()
for key, value in attrs.items():
if issubclass(value.__class__, tasks.Loop): # If the func is a subclass of a `tasks.Loop`
task = value
coro = task.__dict__.get('coro')
if not task.is_running():
interval = task.__dict__.get('_sleep') # Im keeping it in seconds, bite me.
logging.info(f'Starting task {coro.__name__} with an interval of {interval} seconds.')
task.start()
else:
logging.warning(f'Tried to start task {coro.__name__} when it was already started.')
_inner_tasks.append(value)
new_cls.__tasks__ = _inner_tasks # assign our tasks to __tasks__
return new_cls
def _unload_tasks(cls):
for task in cls.__tasks__: # for each task in the tasks we got earlier
# Feel free to log here as well, I printed for the example
if task.is_running():
coro = task.__dict__.get('coro')
print(f'Stopping task {coro.__name__} after {task.current_loop} intervals.')
task.cancel()
def _load_tasks(cls):
for task in cls.__tasks__:
if not task.is_running():
coro = task.__dict__.get('coro')
interval = task.__dict__.get('_sleep') # Im keeping it in seconds, bite me.
print(f'Starting task {coro.__name__} with an interval of {interval} seconds.')
task.start()
class TestCog(commands.Cog, metaclass=MetaTask):
def __init__(self, bot):
self.bot = bot
def cog_unload(self):
self.__class__._unload_tasks()
@tasks.loop(seconds=15)
async def my_task(self) -> None:
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment