Last active
February 16, 2022 06:00
-
-
Save InvisibleOS/e4c22b7efe6af78ca4b046b5ece06f5c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import disnake | |
import os | |
from disnake.ext import commands | |
from disnake.ext.commands import Param | |
import motor.motor_asyncio as motor | |
import datetime | |
import asyncio | |
client=motor.AsyncIOMotorClient(os.environ["DB_login"]) | |
db=client.discord_database | |
server_data=db["server_data"] | |
collection=db["afk"] | |
class Afk(commands.Cog): | |
def __init__(self, bot): | |
self.bot=bot | |
@commands.command() | |
@commands.cooldown(1, 5, commands.BucketType.user) | |
async def afk(self, ctx, *, status=None): | |
data=(await collection.find_one({"_id": ctx.guild.id}))["afk_members"] | |
if ctx.author.id not in [x["id"] for x in data]: | |
if status is None: | |
status="AFK" | |
embed=disnake.Embed( | |
description=f"Status: {status}", | |
timestamp=ctx.message.created_at, | |
color=disnake.Color.green() | |
) | |
embed.set_author( | |
name=f"{ctx.author} | AFK", | |
icon_url=ctx.author.display_avatar.url | |
) | |
await ctx.send(embed=embed) | |
await asyncio.sleep(5) | |
new_data={"id": ctx.author.id, "status": status, "date": datetime.datetime.utcnow()} | |
await collection.update_one( | |
{"_id": ctx.guild.id}, | |
{ "$push" : { "afk_members" : new_data } } | |
) | |
else: | |
self.afk.reset_cooldown(ctx) | |
@commands.Cog.listener() | |
async def on_message(self, message): | |
if message.author == self.bot.user or message.author == message.author.bot: | |
return | |
else: | |
if (await collection.count_documents({"_id": message.guild.id})) != 0: | |
find_user=(await collection.find_one({"_id": message.guild.id}))["afk_members"] | |
if message.author.id in [x["id"] for x in find_user]: | |
await collection.update_one( | |
{"_id": message.guild.id}, | |
{ "$pull": { "afk_members": { "id": message.author.id } } } | |
) | |
await message.channel.send(f"Welcome back {message.author.mention}, I removed your AFK") | |
else: | |
for member in message.mentions: | |
if member.id in [x["id"] for x in find_user]: | |
status=[x["status"] for x in find_user if x["id"] == member.id] | |
afk_date=[x["date"] for x in find_user if x["id"] == member.id][0] | |
await message.channel.send(f"{member.mention} is AFK: {status[0]} - {disnake.utils.format_dt(afk_date, style='R')}", allowed_mentions=disnake.AllowedMentions.none()) | |
else: | |
return | |
@afk.error | |
async def afk_error(self, ctx, error): | |
if isinstance(error, commands.CommandOnCooldown): | |
embed=disnake.Embed( | |
description="This command is on cooldown", | |
color=disnake.Color.red() | |
) | |
await ctx.send(embed=embed, delete_after=5) | |
else: | |
raise error | |
def setup(bot): | |
bot.add_cog(Afk(bot)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import disnake | |
import os | |
from disnake.ext import commands | |
from disnake.ext.commands import Param | |
import motor.motor_asyncio as motor | |
import datetime | |
import asyncio | |
client=motor.AsyncIOMotorClient(os.environ["DB_login"]) | |
db=client.discord_database | |
server_data=db["server_data"] | |
collection=db["afk"] | |
class Afk(commands.Cog): | |
def __init__(self, bot): | |
self.bot=bot | |
@commands.slash_command( | |
description="Set an AFK status for this server", | |
guild_ids=[576460042774118420] | |
) | |
@commands.cooldown(1, 5, commands.BucketType.user) | |
async def afk(self, interaction, | |
status: str=Param(None, description="Your AFK Status") | |
): | |
data=(await collection.find_one({"_id": interaction.guild.id}))["afk_members"] | |
if interaction.author.id not in [x["id"] for x in data]: | |
if status is None: | |
status="AFK" | |
await interaction.response.send_message(f"**✅ AFK Set**\nStatus: {status}", ephemeral=True) | |
await asyncio.sleep(5) | |
new_data={"id": interaction.author.id, "status": status, "date": datetime.datetime.utcnow()} | |
await collection.update_one( | |
{"_id": interaction.guild.id}, | |
{ "$push" : { "afk_members" : new_data } } | |
) | |
else: | |
await interaction.response.send_message("You are already AFK", ephemeral=True) | |
self.afk.reset_cooldown(interaction) | |
@commands.Cog.listener() | |
async def on_message(self, message): | |
if message.author == self.bot.user or message.author == message.author.bot: | |
return | |
else: | |
if (await collection.count_documents({"_id": message.guild.id})) != 0: | |
find_user=(await collection.find_one({"_id": message.guild.id}))["afk_members"] | |
if message.author.id in [x["id"] for x in find_user]: | |
await collection.update_one( | |
{"_id": message.guild.id}, | |
{ "$pull": { "afk_members": { "id": message.author.id } } } | |
) | |
await message.channel.send(f"Welcome back {message.author.mention}, I removed your AFK", delete_after=15) | |
else: | |
for member in message.mentions: | |
if member.id in [x["id"] for x in find_user]: | |
status=[x["status"] for x in find_user if x["id"] == member.id] | |
afk_date=[x["date"] for x in find_user if x["id"] == member.id][0] | |
await message.channel.send(f"{member.mention} is AFK: {status[0]} - {disnake.utils.format_dt(afk_date, style='R')}", allowed_mentions=disnake.AllowedMentions.none()) | |
else: | |
return | |
@afk.error | |
async def afk_error(self, interaction, error): | |
if isinstance(error, commands.CommandOnCooldown): | |
await interaction.response.send_message("This command is on cooldown", ephemeral=True) | |
else: | |
raise error | |
def setup(bot): | |
bot.add_cog(Afk(bot)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment