Skip to content

Instantly share code, notes, and snippets.

@LeoCx1000
Created February 17, 2023 21:57
Show Gist options
  • Save LeoCx1000/1cc9c73fd955455362fef3eef79e1260 to your computer and use it in GitHub Desktop.
Save LeoCx1000/1cc9c73fd955455362fef3eef79e1260 to your computer and use it in GitHub Desktop.

Multiple concurrencies

Here is an example on how to use commmands.MaxConcurrency for multiple bucket types. If you want to add more concurrencies, you will need to add more try/except blocks and in each one's except statement you need to release all the previous concurrencies that you have acquired.

import asyncio
from typing import Any
from discord.ext import commands
class Test(commands.Cog):
'''test cog'''
def __init__(self):
# We need to store the concurrencies. I am storing it in the cog scope attached to self, for easy access.
self.user_concurrency = commands.MaxConcurrency(1, per=commands.BucketType.user, wait=False)
self.channel_concurrency = commands.MaxConcurrency(1, per=commands.BucketType.channel, wait=False)
@commands.command()
async def test(self, ctx: commands.Context[commands.Bot]):
"""Test command."""
m = await ctx.send('sleeping ten seconds')
await asyncio.sleep(10)
await m.edit(content='slept')
@test.before_invoke
async def test_before(self, ctx: commands.Context[commands.Bot]):
"""This triggers before the command runs."""
await self.user_concurrency.acquire(ctx.message)
# First, we acquire the first concurrency.
# which, if its already acquired it will
# raise an error because wait is set to true.
try:
await self.channel_concurrency.acquire(ctx.message)
# Then, we acquire our second concurrency.
except commands.MaxConcurrencyReached as error:
# If this second concurrency fails, we need to release the
# first one we just acquired since it will not go through, and
# if we don't do this, the first one will be locked forever
# because the after_invoke hook will not trigger.
await self.user_concurrency.release(ctx.message)
# Then, we raise the error so it propagates to the error handlers
# and it doesn't just get eaten. If we don't raise it will instead
# work without raising an error.
raise error
@test.after_invoke
async def test_after(self, ctx: commands.Context[commands.Bot]):
"""This triggers after the command ran."""
# Here in the after, we just release both concurrencies so
# the commands can be ran again by other people.
await self.user_concurrency.release(ctx.message)
await self.channel_concurrency.release(ctx.message)
@Sachaa-Thanasius
Copy link

Sachaa-Thanasius commented Mar 22, 2024

It won't be needed by most, but if one has enough concurrencies that managing the try-excepts becomes a pain, this can be refactored to work on a sequence of MaxConcurrency instances. Same principles apply, but I figured it was worth mentioning.

class Test(commands.Cog):
    """test cog"""

    def __init__(self):
        user_concurrency = commands.MaxConcurrency(1, per=commands.BucketType.user, wait=False)
        channel_concurrency = commands.MaxConcurrency(1, per=commands.BucketType.channel, wait=False)
        self.concurrencies: list[commands.MaxConcurrency] = [user_concurrency, channel_concurrency]

    ...

    @test.before_invoke
    async def test_before(self, ctx: commands.Context[commands.Bot]) -> None:
        await self.concurrencies[0].acquire(ctx.message)

        for index, conc in enumerate(self.concurrencies[1:], start=1):
            try:
                await conc.acquire(ctx.message)
            except commands.MaxConcurrencyReached:
                for acquired_conc in self.concurrencies[:index]:
                    await acquired_conc.release(ctx.message)

                raise
    
    @test.after_invoke
    async def test_after(self, ctx: commands.Context[commands.Bot]) -> None:
        for conc in self.concurrencies:
            await conc.release(ctx.message)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment