Skip to content

Instantly share code, notes, and snippets.

@zRitsu
Last active November 25, 2022 11:38
Show Gist options
  • Save zRitsu/263bd1d640b99a662feeb25125501ad7 to your computer and use it in GitHub Desktop.
Save zRitsu/263bd1d640b99a662feeb25125501ad7 to your computer and use it in GitHub Desktop.
discord.py 2.x: music.py para lavalink (inclui servidor lavalink público fornecido por freelavalink.ga), code originalmente portado do tutorial do dune: https://www.youtube.com/watch?v=YGx0xNHzjgE (mas foi feito um rewrite completo pra incluir controller e muito mais).
# esse code requer instalar os 4 packages abaixo:
# pip3 install -U humanize spotipy git+https://github.com/zRitsu/Wavelink.git@c72a90f721d68b9a9bea37b9d43c18283a9a7605 git+https://github.com/Rapptz/discord.py.git
from configparser import ConfigParser
from functools import partial
import discord
from discord import app_commands
from discord.ext import commands
import wavelink
import itertools
import re
import os
from wavelink import Node
import asyncio
from collections import deque
from random import shuffle
import sys
import traceback
from aiohttp import ClientSession
from typing import Optional, Union, List
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import humanize
from json import load
URL_REG = re.compile(r'https?://(?:www\.)?.+')
try:
with open('../config.json') as f:
CFG = load(f)
except FileNotFoundError:
CFG = {}
lavalink_servers = [
{
'host': 'lavalink.oops.wtf',
'port': 443,
'password': 'www.freelavalink.ga',
'identifier': 'freelavalink.ga',
'region': 'us_central',
'secure': True,
},
]
SPOTIFY_CLIENT_ID = ""
SPOTIFY_CLIENT_SECRET = ""
vote_skip_amount = CFG.get('vote_skip_amount', 3)
msg_ad = "[`Crie seu próprio Bot de Música`](https://youtu.be/YGx0xNHzjgE)"
if os.path.isfile("./lavalink.ini"):
config = ConfigParser()
try:
config.read('lavalink.ini')
except Exception as e:
traceback.print_exc()
else:
try:
for key, value in {section: dict(config.items(section)) for section in config.sections()}.items():
value["identifier"] = key.replace(" ", "_")
value["port"] = int(value.get("port", 0))
value["secure"] = value.get("secure") == "true"
value["region"] = value.get("region", "us_central")
lavalink_servers.append(value)
except:
traceback.print_exc()
def get_button_style(enabled: bool, red=True):
if enabled:
if red:
return discord.ButtonStyle.red
return discord.ButtonStyle.green
return discord.ButtonStyle.grey
async def has_perm(ctx):
try:
ctx.player = ctx.bot.music.players[ctx.guild.id]
except KeyError:
return True
if ctx.author in ctx.player.dj:
return True
if ctx.author.guild_permissions.manage_channels:
return True
vc = ctx.bot.get_channel(ctx.player.channel_id)
if ctx.bot.intents.members and not [m for m in vc.members if
not m.bot and (m.guild_permissions.manage_channels or m in ctx.player.dj)]:
ctx.player.dj.append(ctx.author)
await ctx.send(embed=discord.Embed(
description=f"{ctx.author.mention} foi adicionado à lista de DJ's por não haver um no canal <#{vc.id}>.",
color=ctx.me.color))
return True
##################################
######## perms decorators ########
##################################
def has_player():
def predicate(ctx):
try:
ctx.player = ctx.bot.music.players[ctx.guild.id]
except AttributeError:
raise NoPlayer()
return True
return commands.check(predicate)
def is_dj():
async def predicate(ctx):
try:
ctx.player = ctx.bot.music.players[ctx.guild.id]
except KeyError:
raise NoPlayer()
if ctx.player.restrict_mode and not await has_perm(ctx):
raise NotDJorStaff()
return True
return commands.check(predicate)
def is_requester():
async def predicate(ctx):
try:
ctx.player = ctx.bot.music.players.get(ctx.guild.id)
except KeyError:
raise NoPlayer()
if not ctx.player.current:
raise NoSource()
if ctx.player.current.requester == ctx.author:
return True
try:
if await has_perm(ctx):
return True
except NotDJorStaff:
pass
raise NotRequester()
return commands.check(predicate)
def check_voice():
def predicate(ctx):
try:
if ctx.author.voice.channel != ctx.me.voice.channel:
raise DiffVoiceChannel()
except AttributeError:
pass
if not ctx.author.voice:
raise NoVoice()
return True
return commands.check(predicate)
def has_source():
def predicate(ctx):
try:
ctx.player
except:
ctx.player = ctx.bot.music.players.get(ctx.guild.id)
if not ctx.player:
raise NoPlayer()
if not ctx.player.current:
raise NoSource()
return True
return commands.check(predicate)
def user_cooldown(rate: int, per: int):
def custom_cooldown(message):
if message.author.guild_permissions.administrator:
return None # sem cooldown
return commands.Cooldown(rate, per)
return custom_cooldown
########################
##### Converters #######
########################
def fix_characters(text: str, limit: int = 0):
replaces = [
('&quot;', '"'),
('&amp;', '&'),
('(', '\u0028'),
(')', '\u0029'),
('[', '【'),
(']', '】'),
(" ", " "),
("*", '"'),
("_", ' '),
("{", "\u0028"),
("}", "\u0029"),
]
for r in replaces:
text = text.replace(r[0], r[1])
if limit:
text = f"{text[:limit]}..." if len(text) > limit else text
return text
def time_format(milliseconds):
m, s = divmod(int(milliseconds / 1000), 60)
h, m = divmod(m, 60)
strings = f"{m:02d}:{s:02d}"
if h:
strings = f"{h}:{strings}"
return strings
def seek_parser(time):
try:
time = str(time).split(':')
if len(time) > 1:
return int(time[0]) * 60 + int(time[1])
else:
return int(time[0])
except ValueError:
return
def get_track_index(ctx, query):
index = None
player: CustomPlayer = ctx.player
for counter, track in enumerate(player.queue):
if query.lower() in track.title.lower() or \
all(elem in track.title.lower().split() for elem in query.lower().split()):
index = counter
break
return index
########################
### Classe de testes ###
########################
class TestBot(commands.Bot):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def setup_bot(self):
await self.wait_until_ready()
await self.tree.sync()
print(f'Logado como: {self.user} [{self.user.id}]')
async def setup_hook(self):
await self.add_cog(Music(self))
self.loop.create_task(self.setup_bot())
##################
##### Errors #####
##################
class NoPlayer(commands.CheckFailure):
pass
class NoVoice(commands.CheckFailure):
pass
class DiffVoiceChannel(commands.CheckFailure):
pass
class NoSource(commands.CheckFailure):
pass
class NotDJorStaff(commands.CheckFailure):
pass
class NotRequester(commands.CheckFailure):
pass
class MissingSpotifyClient(commands.CheckFailure):
pass
##########################
##### Spotify Stuffs #####
##########################
spotify_regex = re.compile("https://open.spotify.com?.+(album|playlist|track)/([a-zA-Z0-9]+)")
def fix_spotify_data(data: dict):
try:
return data["track"]
except KeyError:
data = {"track": data}
return data
class SpotifyPlaylist:
def __init__(self, data: dict, ctx):
self.data = data
self.tracks = [
SpotifyTrack(
uri=i['track']['external_urls']['spotify'],
authors=i['track']["artists"],
title=i['track']['name'],
thumb=i['track']['album']['images'][0]['url'],
duration=i['track']['duration_ms'],
ctx=ctx
) for i in data['tracks']]
class SpotifyTrack:
def __init__(self, *, uri, title, authors, thumb, duration, ctx):
self.author = fix_characters(authors[0]['name'])
self.authors = [fix_characters(i['name']) for i in authors]
self.id = ""
self.title = fix_characters(title)
self.thumb = thumb
self.uri = uri
self.duration = duration
self.is_stream = False
self.info = {}
self.requester = ctx.author
async def resolve(self, node: Node):
if self.id:
return
try:
yt_track = (await node.get_tracks(f"ytsearch:{self.title} - {self.author}"))[0]
self.data = yt_track.info
self.id = yt_track.id
except IndexError:
return
except Exception:
traceback.print_exc()
async def process_spotify(ctx: commands.Context, query: str):
if not (matches := spotify_regex.match(query)):
return
if not ctx.bot.spotify:
raise MissingSpotifyClient()
url_type, url_id = matches.groups()
if url_type == "track":
to_run = partial(ctx.bot.spotify.track, url_id)
t = (await ctx.bot.loop.run_in_executor(None, to_run))
return [SpotifyTrack(uri=t['external_urls']['spotify'], authors=t["artists"], title=t['name'],
thumb=t['album']['images'][0]['url'], duration=t['duration_ms'], ctx=ctx)]
data = {
'loadType': 'PLAYLIST_LOADED',
'playlistInfo': {'name': '', 'selectedTrack': -1},
'tracks': []
}
if url_type == "album":
to_run = partial(ctx.bot.spotify.track, url_id)
result = (await ctx.bot.loop.run_in_executor(None, to_run))
data["playlistInfo"]["name"] = result['name']
for t in result['tracks']['items']:
t['album'] = {'images': [{'url': result['images'][0]['url']}]}
data["tracks"] = [fix_spotify_data(i) for i in result['tracks']['items']]
elif url_type == "playlist":
to_run = partial(ctx.bot.spotify.playlist, playlist_id=url_id)
result = (await ctx.bot.loop.run_in_executor(None, to_run))
data["playlistInfo"]["name"] = result['name']
data["tracks"] = result['tracks']['items']
if data["tracks"]:
return SpotifyPlaylist(data, ctx)
def spotify_client() -> Optional[spotipy.Spotify]:
try:
return spotipy.Spotify(
auth_manager=SpotifyClientCredentials(
client_id=SPOTIFY_CLIENT_ID or os.environ['SPOTIFY_CLIENT_ID'],
client_secret=SPOTIFY_CLIENT_SECRET or os.environ["SPOTIFY_CLIENT_SECRET"]
)
)
except Exception as e:
print(
f"A APIKEY do spotify não foi configurada devidamente nas variáveis da host (ou no arquivo .env), verifique "
f"e tente novamente caso queira o suporte a músicas do spotify.\nErro: {repr(e)}")
return
##############################
##### Paginator Classes ######
##############################
class QueueInteraction(discord.ui.View):
def __init__(self, player, user: discord.Member, timeout=60):
self.player = player
self.user = user
self.pages = []
self.current = 0
self.max_page = len(self.pages) - 1
super().__init__(timeout=timeout)
self.embed = discord.Embed(color=user.guild.me.colour)
self.update_pages()
self.update_embed()
def update_pages(self):
counter = 1
entries = list(self.player.queue)
self.pages = [entries[i:i + 8] for i in range(0, len(entries), 8)]
for n, page in enumerate(self.pages):
txt = "\n"
for t in page:
txt += f"`{counter})` [`{fix_characters(t.title, limit=50)}`]({t.uri})\n" \
f"`[{time_format(t.duration) if not t.is_stream else '🔴 Livestream'}]`" + \
f" - {t.requester.mention}\n`---------`\n"
counter += 1
self.pages[n] = txt
self.current = 0
self.max_page = len(self.pages) - 1
def update_embed(self):
self.embed.title = f"**Músicas na fila [{self.current + 1} / {self.max_page + 1}]**"
self.embed.description = self.pages[self.current]
@discord.ui.button(emoji='⏮️', style=discord.ButtonStyle.grey)
async def first(self, interaction: discord.Interaction, button):
self.current = 0
self.update_embed()
await interaction.response.edit_message(embed=self.embed)
@discord.ui.button(emoji='⬅️', style=discord.ButtonStyle.grey)
async def back(self, interaction: discord.Interaction, button):
if self.current == 0:
self.current = self.max_page
else:
self.current -= 1
self.update_embed()
await interaction.response.edit_message(embed=self.embed)
@discord.ui.button(emoji='➡️', style=discord.ButtonStyle.grey)
async def next(self, interaction: discord.Interaction, button):
if self.current == self.max_page:
self.current = 0
else:
self.current += 1
self.update_embed()
await interaction.response.edit_message(embed=self.embed)
@discord.ui.button(emoji='⏭️', style=discord.ButtonStyle.grey)
async def last(self, interaction: discord.Interaction, button):
self.current = self.max_page
self.update_embed()
await interaction.response.edit_message(embed=self.embed)
@discord.ui.button(emoji='⏹️', style=discord.ButtonStyle.grey)
async def stop_interaction(self, interaction: discord.Interaction, button):
await interaction.response.edit_message(content="Queue fechada", embed=None, view=None)
self.stop()
@discord.ui.button(emoji='🔄', label="Recarregar lista", style=discord.ButtonStyle.grey)
async def update_q(self, interaction: discord.Interaction, button):
self.update_pages()
self.update_embed()
await interaction.response.edit_message(embed=self.embed)
##########################
##### Music Classes ######
##########################
class PlayerInteractions(discord.ui.View):
def __init__(self, ctx):
self.ctx = ctx
super().__init__(timeout=None)
async def interaction_check(self, interaction: discord.Interaction):
player = self.ctx.bot.music.players.get(interaction.guild.id)
if player.interaction_cooldown:
await interaction.response.send_message("O player está em cooldown, tente novamente em instantes.",
ephemeral=True)
return
vc = self.ctx.bot.get_channel(player.channel_id)
if interaction.user not in vc.members:
embed = discord.Embed(
description=f"Você deve estar no canal <#{vc.id}> para usar isto.",
color=discord.Colour.red()
)
await interaction.response.send_message(embed=embed, ephemeral=True)
return
control = interaction.data.get("custom_id")[12:]
kwargs = {}
if control == "help":
embed = discord.Embed(
description=f"📘 **IFORMAÇÕES SOBRE OS BOTÕES** 📘\n\n"
f"⏯️ `= Pausar/Retomar a música`\n"
f"⏮️ `= Voltar para a música tocada anteriormente`\n"
f"⏭️ `= Pular para a próxima música`\n"
f"⏪ `= Voltar o tempo da música em 20 seg.`\n"
f"⏩ `= Avançar o tempo da música em 20 seg.`\n"
f"⏹️ `= Parar o player e me desconectar do canal`\n"
f"🔀 `= Misturar as músicas da fila`\n"
f"🇳 `= Ativar/Desativar o efeito Nightcore`\n"
f"🔁 `= Ativar/Desativar repetição da música`",
color=self.ctx.me.color
)
await interaction.response.send_message(embed=embed, ephemeral=True)
return
elif control == "fastbackward":
control = "seek"
kwargs = {"pos": time_format(player.position - 20000)}
elif control == "fastforward":
control = "seek"
kwargs = {"pos": time_format(player.position + 20000)}
elif control == "playpause":
control = "pause" if not player.paused else "resume"
elif control == "loop":
if player.loop == "current":
control = "loopqueue"
elif player.loop == "queue":
pass
else:
player.loop = False
ctx = await player.bot.get_context(interaction.message)
cmd: commands.Command = player.bot.get_command(control)
ctx.command = cmd
ctx.player = player
ctx.author = interaction.user
ctx.interaction = interaction
ctx.channel = interaction.channel
try:
bucket = cmd._buckets.get_bucket(ctx)
if bucket:
retry_after = bucket.update_rate_limit()
if retry_after:
raise commands.CommandOnCooldown(cooldown=bucket, retry_after=retry_after, type=cmd._buckets.type)
await cmd(ctx, **kwargs)
player.interaction_cooldown = True
await asyncio.sleep(1)
player.interaction_cooldown = False
except Exception as e:
await ctx.cog.cog_command_error(ctx, e)
class CustomTrack(wavelink.Track):
def __init__(self, *args, **kwargs):
self.requester = kwargs.pop('requester')
args[1]['title'] = fix_characters(args[1]['title'])
super().__init__(*args, **kwargs)
if self.ytid:
self.thumb = f"https://img.youtube.com/vi/{self.ytid}/mqdefault.jpg"
else:
self.thumb = self.info.get("artworkUrl", "")
class CustomPlayer(wavelink.Player):
def __init__(self, *args, **kwargs):
self.ctx: commands.Context = kwargs.pop('ctx')
super().__init__(*args, **kwargs)
self.text_channel: discord.TextChannel = self.ctx.channel
self.message: Optional[discord.Message] = None
self.queue = deque()
self.played = deque(maxlen=20)
self.nightcore = False
self.dj = [] if self.ctx.author.guild_permissions.manage_channels else [self.ctx.author]
self.loop = False
self.last_track: Optional[CustomTrack] = None
self.locked = False
self.idle = None
self.idle_timeout = 180 # aguardar 3 minutos para adicionar novas músicas
self.is_previows_music = False
self.updating_message = None
self.command_log = ""
self.last_embed = None
self.interaction_cooldown = False
self.votes = set()
self.force_edit_message = False
self.restrict_mode = False
async def destroy(self, *, force: bool = False):
try:
self.idle.cancel()
except:
pass
await self.destroy_message()
await super().destroy()
async def idling_mode(self):
await self.destroy_message()
self.view = PlayerInteractions(self.ctx)
self.view.add_item(discord.ui.Button(emoji="⏮️", custom_id=f"musicplayer_back", label="Voltar"))
self.view.add_item(discord.ui.Button(emoji="⏹️", custom_id=f"musicplayer_stop", label="Parar"))
embed = discord.Embed(
description=f"**Não há músicas na fila. Adicione uma música ou use um dos botões abaixo\n\n"
f"[O Player será desligado em: {time_format(self.idle_timeout * 1000)}]**",
color=self.ctx.me.colour
)
self.message = await self.text_channel.send(embed=embed, view=self.view)
await asyncio.sleep(self.idle_timeout)
embed = discord.Embed(description="**O player foi desligado por inatividade...**",
color=discord.Colour.dark_gold())
self.bot.loop.create_task(self.text_channel.send(embed=embed))
self.bot.loop.create_task(self.destroy())
return
async def process_next(self):
if self.locked:
return
try:
track = self.queue.popleft()
except:
self.idle = self.bot.loop.create_task(self.idling_mode())
return
try:
self.idle.cancel()
except:
pass
self.locked = True
if isinstance(track, SpotifyTrack):
await track.resolve(self.node)
if not track.id:
return await self.process_next()
self.last_track = track
await self.play(track)
self.locked = False
async def invoke_np(self, force=False, interaction=None):
if not self.current:
return
embed = discord.Embed(color=self.ctx.me.color)
embed_top = discord.Embed(
color=self.ctx.me.color,
description=f"> [**{self.current.title}**]({self.current.uri})"
)
if not self.paused:
embed_top.set_author(
name="Tocando Agora:",
icon_url="https://cdn.discordapp.com/attachments/480195401543188483/895862881105616947/music_equalizer.gif"
)
else:
embed_top.set_author(
name="Em Pausa:",
icon_url="https://cdn.discordapp.com/attachments/480195401543188483/896013933197013002/pause.png"
)
embed_top.set_thumbnail(url=self.current.thumb)
if self.current.is_stream:
duration = "🔴 **⠂Livestream**"
else:
duration = f"⏰ **⠂Duração:** `{time_format(self.current.duration)}`"
txt = f"> {duration}\n" \
f"> 💠 **⠂Uploader**: `{self.current.author}`\n" \
f"> 🎧 **⠂Pedido por:** {self.current.requester.mention}\n" \
f"> 🔊 **⠂Volume:** `{self.volume}%`"
txt += "\n"
if self.restrict_mode:
txt += "> 🔒 **⠂Modo restrito: `ativado`\n"
if self.command_log:
txt += f"> 📈 **⠂Última Interação:** {self.command_log}\n"
if len(self.queue):
txt += "```ldif\nPróximas Músicas:```"
txt += "\n".join(
f"`{n + 1}) [{time_format(t.duration) if t.duration else '🔴 Livestream'}]` [`{fix_characters(t.title, 31)}`]({t.uri})"
for n, t
in enumerate(itertools.islice(self.queue, 3))
)
if (qsize := len(self.queue)) > 3:
txt += f"\n`╚══════ E mais {qsize - 3} música(s) ══════╝" + f"`\n\n{msg_ad}" if msg_ad else "`"
else:
txt += f"\n\n{msg_ad}" if msg_ad else ""
else:
txt += f"\n{msg_ad}" if msg_ad else ""
embed.description = txt
embed.set_image(
url="https://cdn.discordapp.com/attachments/554468640942981147/937918500784197632/rainbow_bar.gif"
)
embed_top.set_image(
url="https://cdn.discordapp.com/attachments/554468640942981147/937918500784197632/rainbow_bar.gif"
)
try:
self.view.stop()
except:
pass
self.view = PlayerInteractions(self.ctx)
self.view.add_item(
discord.ui.Button(emoji="⏯️", custom_id=f"musicplayer_playpause", style=get_button_style(self.paused)))
self.view.add_item(discord.ui.Button(emoji="⏮️", custom_id=f"musicplayer_back"))
self.view.add_item(discord.ui.Button(emoji="⏭️", custom_id=f"musicplayer_skip"))
self.view.add_item(discord.ui.Button(emoji="🇳", custom_id=f"musicplayer_nightcore",
style=get_button_style(self.nightcore, red=False)))
self.view.add_item(discord.ui.Button(
emoji=("🔂" if self.loop == "current" else "🔁"),
custom_id=f"musicplayer_loop", style=discord.ButtonStyle.grey
if not self.loop else discord.ButtonStyle.blurple
if self.loop == "current"
else discord.ButtonStyle.green)
)
self.view.add_item(discord.ui.Button(emoji="⏹️", custom_id=f"musicplayer_stop"))
self.view.add_item(discord.ui.Button(emoji="⏪", custom_id=f"musicplayer_fastbackward"))
self.view.add_item(discord.ui.Button(emoji="⏩", custom_id=f"musicplayer_fastforward"))
self.view.add_item(discord.ui.Button(emoji="🔀", custom_id=f"musicplayer_shuffle"))
self.view.add_item(discord.ui.Button(emoji="ℹ️", custom_id=f"musicplayer_help"))
if not force and self.message:
self.force_edit_message = False
try:
if interaction:
await interaction.response.edit_message(embeds=[embed_top, embed], view=self.view)
else:
await self.message.edit(embeds=[embed_top, embed], view=self.view)
return
except:
traceback.print_exc()
pass
await self.destroy_message(destroy_view=False)
self.last_embed = embed
self.ctx.player = self
self.message = await self.text_channel.send(embeds=[embed_top, embed], view=self.view)
async def destroy_message(self, destroy_view=True):
if destroy_view:
try:
self.view.stop()
except:
pass
self.view = None
try:
await self.message.delete()
except:
pass
self.last_embed = None
self.message = None
def is_last_message(self):
try:
return self.text_channel.last_message_id == self.message.id
except AttributeError:
return
async def update_message_task(self, interaction=None):
if not interaction:
await asyncio.sleep(5)
try:
await self.invoke_np(interaction=interaction)
except:
traceback.print_exc()
self.updating_message = None
def update_message(self, interaction=None):
if self.updating_message:
return
self.updating_message = self.bot.loop.create_task(self.update_message_task(interaction=interaction))
##################################
##### Music Commands/Events ######
##################################
class Music(commands.Cog, wavelink.WavelinkMixin):
def __init__(self, bot: commands.Bot):
if not hasattr(bot, 'spotify'):
bot.spotify = spotify_client()
if not hasattr(bot, 'wavelink'):
bot.music = wavelink.Client(bot=bot)
self.bot = bot
self.bot.loop.create_task(self.process_nodes())
async def process_nodes(self):
await self.bot.wait_until_ready()
if not hasattr(self.bot, 'session') or not self.bot.session:
self.bot.session = ClientSession()
for node in lavalink_servers:
self.bot.loop.create_task(self.connect_node(node))
async def connect_node(self, data: dict):
data['rest_uri'] = ("https" if data.get('secure') else "http") + f"://{data['host']}:{data['port']}"
retries = 1
backoff = 7
while not self.bot.is_closed():
if retries >= 25:
print(f"Todas as tentativas de conectar ao servidor [{data['identifier']}] falharam.")
return
else:
try:
async with self.bot.session.get(data['rest_uri'], timeout=10) as r:
break
except Exception:
await asyncio.sleep(backoff)
print(f'Falha ao conectar no servidor [{data["identifier"]}], tentativa: {retries}/25')
retries += 1
backoff += 2
continue
await self.bot.music.initiate_node(**data)
@wavelink.WavelinkMixin.listener("on_websocket_closed")
async def node_ws_voice_closed(self, node, payload: wavelink.events.WebsocketClosed):
if payload.code == 1000:
return
player: CustomPlayer = payload.player
if payload.code == 4014:
if player.ctx.me.voice:
return
vc = player.bot.get_channel(player.channel_id)
if vc:
vcname = f" **{vc.name}**"
else:
vcname = ""
channel = player.text_channel
embed = discord.Embed(color=player.ctx.me.color)
embed.description = f"Conexão perdida com o canal de voz{vcname}..."
embed.description += "\nO player será finalizado..."
player.bot.loop.create_task(channel.send(embed=embed))
await player.destroy()
return
# fix para dpy 2x (erro ocasionado ao mudar o bot de canal)
if payload.code == 4006:
await player.connect(player.channel_id)
return
print(f"Erro no canal de voz! server: {player.ctx.guild.name} reason: {payload.reason} | code: {payload.code}")
@wavelink.WavelinkMixin.listener('on_track_exception')
async def wavelink_track_error(self, node, payload: wavelink.TrackException):
player: CustomPlayer = payload.player
track = player.last_track
embed = discord.Embed(
description=f"**Falha ao reproduzir música:\n[{track.title}]({track.uri})** ```java\n{payload.error}\n```",
color=discord.Colour.red())
await player.text_channel.send(embed=embed)
if player.locked:
return
player.current = None
if payload.error == "This IP address has been blocked by YouTube (429)":
player.queue.appendleft(player.last_track)
else:
player.played.append(player.last_track)
player.locked = True
await asyncio.sleep(6)
player.locked = False
await player.process_next()
@wavelink.WavelinkMixin.listener()
async def on_node_ready(self, node: wavelink.Node):
print(f'Servidor de música: [{node.identifier}] está pronto para uso!')
@wavelink.WavelinkMixin.listener()
async def on_track_end(self, node: wavelink.Node, payload: wavelink.TrackEnd):
player: CustomPlayer = payload.player
if player.locked:
return
if payload.reason == "FINISHED":
player.command_log = ""
elif payload.reason == "STOPPED":
player.force_edit_message = True
else:
return
player.votes.clear()
player.locked = True
await asyncio.sleep(0.5)
if player.last_track:
if player.loop == "queue":
if player.is_previows_music:
player.queue.insert(1, player.last_track)
player.is_previows_music = False
else:
player.queue.append(player.last_track)
elif player.loop == "current":
player.queue.appendleft(player.last_track)
elif player.is_previows_music:
player.queue.insert(1, player.last_track)
player.is_previows_music = False
else:
player.played.append(player.last_track)
elif player.is_previows_music:
player.is_previows_music = False
player.locked = False
await player.process_next()
async def interaction_message(self, ctx, txt):
if ctx.interaction:
txt = f"{ctx.author.mention} {txt}"
ctx.player.command_log = txt
ctx.player.update_message(interaction=ctx.interaction)
else:
txt = f"{ctx.author.mention} **{txt}**"
embed = discord.Embed(color=discord.Colour.green(), description=txt)
ctx.player.update_message()
await ctx.send(embed=embed)
async def send_message(self, ctx, text=None, *, embed: discord.Embed = None, ephemeral=False):
if ctx.interaction:
await ctx.interaction.response.send_message(text, embed=embed, ephemeral=True)
else:
try:
await ctx.respond(text, embed=embed, ephemeral=ephemeral)
except AttributeError:
await ctx.reply(text, embed=embed, mention_author=False)
@wavelink.WavelinkMixin.listener('on_track_start')
async def track_start(self, node, payload: wavelink.TrackStart):
player: CustomPlayer = payload.player
await player.invoke_np(force=not player.is_last_message())
player.command_log = ""
@check_voice()
@commands.dynamic_cooldown(user_cooldown(2, 5), commands.BucketType.member)
@app_commands.describe(
query="Nome ou link da música (youtube/soundcloud/spotify)",
position="Posição da fila para adicionar"
)
@commands.hybrid_command(aliases=["p", "tocar"], description="Tocar música em um canal de voz.")
async def play(self, ctx, *, query: str, position=None):
query = query.strip("<>")
if not URL_REG.match(query):
query = f"ytsearch:{query}"
embed = discord.Embed(color=discord.Colour.red())
node = self.bot.music.get_best_node()
if not node:
embed.description = "Não há servidores de música disponível."
await ctx.send(embed=embed)
return
await ctx.defer()
tracks = await process_spotify(ctx, query)
if not tracks:
tracks = await self.bot.music.get_tracks(query)
if not tracks:
embed.description = "Não houve resultados para sua busca."
await ctx.send(embed=embed)
return
player: CustomPlayer = self.bot.music.get_player(guild_id=ctx.guild.id, cls=CustomPlayer, ctx=ctx,
node_id=node.identifier)
pos_txt = ""
embed.colour = ctx.me.color
if isinstance(tracks, list):
if not isinstance(tracks[0], SpotifyTrack):
track = CustomTrack(tracks[0].id, tracks[0].info, requester=ctx.author)
else:
track = tracks[0]
if position is None:
player.queue.append(track)
else:
player.queue.insert(position, track)
pos_txt = f" na posição {position + 1} da fila"
embed.description = f"**Música adicionada{pos_txt}:\n[`{track.title}`]({track.uri})**\n\n`{track.author} | " \
f"{time_format(track.duration) if not track.is_stream else '🔴 Livestream'}`"
embed.set_thumbnail(url=track.thumb)
else:
if not isinstance(tracks, SpotifyPlaylist):
tracks.tracks = [CustomTrack(t.id, t.info, requester=ctx.author) for t in tracks.tracks]
info = tracks.data['playlistInfo']
if (selected := info['selectedTrack']) > 0:
tracks.tracks = tracks.tracks[selected:] + tracks.tracks[:selected]
if position is None or len(tracks.tracks) < 2:
for track in tracks.tracks:
player.queue.append(track)
else:
tracks.tracks.reverse()
for track in tracks.tracks:
player.queue.insert(position, track)
pos_txt = f" na posição {position + 1} da fila"
embed.description = f"**Playlist adicionada{pos_txt}:**\n[`{info['name']}`]({query})\n\n`[{len(tracks.tracks)}] Música(s)`"
embed.set_thumbnail(url=tracks.tracks[0].thumb)
if msg_ad:
embed.description += f" | {msg_ad}"
await ctx.send(embed=embed)
if not player.is_connected:
await player.connect(ctx.author.voice.channel.id)
if not player.current:
await player.process_next()
else:
player.update_message()
@check_voice()
@has_source()
@is_requester()
@commands.dynamic_cooldown(user_cooldown(2, 8), commands.BucketType.guild)
@commands.hybrid_command(description="Pula a música atual que está tocando.", aliases=['pular', 's'])
async def skip(self, ctx):
player: CustomPlayer = ctx.player
if not len(player.queue):
await self.send_message(ctx, embed=discord.Embed(description="Não há músicas na fila...",
color=discord.Colour.red()))
return
if not ctx.interaction:
await ctx.message.add_reaction('👍')
elif ctx.interaction.type == discord.InteractionType.component:
await ctx.interaction.response.defer()
player.command_log = f"{ctx.author.mention} pulou a música."
elif ctx.interaction.type == discord.InteractionType.application_command:
await ctx.send(
embed=discord.Embed(
description=f"{ctx.author.mention} pulou a música.",
color=ctx.me.color
)
)
if player.loop == "current":
player.loop = False
await player.stop()
@check_voice()
@has_player()
@is_requester()
@commands.dynamic_cooldown(user_cooldown(2, 8), commands.BucketType.guild)
@commands.hybrid_command(
description="Voltar para a música anterior (ou para o início da música caso não tenha músicas tocadas/na fila).",
aliases=['voltar', 'b']
)
async def back(self, ctx: commands.Context):
player: CustomPlayer = ctx.player
if not len(player.played) and not len(player.queue):
await player.seek(0)
await self.interaction_message(ctx, "voltou para o início da música.")
return
try:
track = player.played.pop()
except:
track = player.queue.pop()
player.last_track = None
player.queue.appendleft(player.current)
player.queue.appendleft(track)
if not ctx.interaction:
await ctx.message.add_reaction('👍')
elif ctx.interaction.type == discord.InteractionType.component:
await ctx.interaction.response.defer()
player.command_log = f"{ctx.author.mention} voltou para a música atual."
elif ctx.interaction.type == discord.InteractionType.application_command:
await ctx.send(
embed=discord.Embed(
description=f"{ctx.author.mention} voltou para a música atual.",
color=ctx.me.color
)
)
if player.loop == "current":
player.loop = False
player.is_previows_music = True
if not player.current:
await player.process_next()
else:
await player.stop()
@check_voice()
@has_source()
@commands.hybrid_command(
aliases=['vs', 'votar'],
description=f"Votar para pular a música atual (quantidade de votos: {vote_skip_amount})."
)
async def voteskip(self, ctx):
player: CustomPlayer = ctx.player
embed = discord.Embed()
if ctx.author in player.votes:
embed.colour = discord.Colour.red()
embed.description = f"{ctx.author.mention} **você já votou para pular a música atual.**"
await self.send_message(ctx, embed=embed)
return
embed.colour = discord.Colour.green()
txt = f"{ctx.author.mention} **votou para pular a música atual (votos: {len(player.votes) + 1}/{vote_skip_amount}).**"
if len(player.votes) < 3:
embed.description = txt
player.votes.add(ctx.author)
await self.send_message(ctx, embed=embed)
return
embed.description = f"{txt}\n**A música atual será pulada imediatamente.**"
await self.send_message(ctx, embed=embed)
await player.stop()
@check_voice()
@has_player()
@is_dj()
@commands.hybrid_command(
description="Parar o player e me desconectar do canal de voz.",
aliases=["parar", "sair", "leave", "l"]
)
async def stop(self, ctx):
player: CustomPlayer = ctx.player
await player.destroy()
embed = discord.Embed(
color=discord.Colour.red(),
description=f"{ctx.author.mention} **parou o player!**"
)
await ctx.send(embed=embed)
@check_voice()
@has_source()
@is_dj()
@commands.dynamic_cooldown(user_cooldown(1, 5), commands.BucketType.member)
@app_commands.describe(vol="porcentagem")
@commands.hybrid_command(aliases=['v', 'vol'], description="Ajustar volume da música.")
async def volume(self, ctx, *, vol: commands.Range[int, 5, 100]):
player: CustomPlayer = ctx.player
embed = discord.Embed(color=discord.Colour.red())
await player.set_volume(vol)
player.update_message()
embed.colour = discord.Colour.green()
embed.description = f"{ctx.author.mention} ajustou o volume para **{vol}%**"
await ctx.send(embed=embed)
@check_voice()
@has_source()
@is_dj()
@commands.dynamic_cooldown(user_cooldown(2, 10), commands.BucketType.member)
@commands.hybrid_command(description="Pausar a música.", aliases=["pausar"])
async def pause(self, ctx):
player: CustomPlayer = ctx.player
embed = discord.Embed(color=discord.Colour.red())
if player.paused:
await self.send_message(ctx, embed=embed)
return
await player.set_pause(True)
txt = f"pausou a música."
await self.interaction_message(ctx, txt)
@check_voice()
@has_source()
@is_dj()
@commands.dynamic_cooldown(user_cooldown(2, 10), commands.BucketType.member)
@commands.hybrid_command(description="Retomar/Despausar a música.", aliases=["retomar"])
async def resume(self, ctx):
player: CustomPlayer = ctx.player
embed = discord.Embed(color=discord.Colour.red())
if not player.paused:
embed.description = "A música não está pausada."
await self.send_message(ctx, embed=embed)
return
await player.set_pause(False)
txt = f"retomou a música."
await self.interaction_message(ctx, txt)
@check_voice()
@has_source()
@is_dj()
@commands.dynamic_cooldown(user_cooldown(2, 5), commands.BucketType.member)
@app_commands.describe(pos="posição, ex: 1:10 / 30 / 0:25")
@commands.hybrid_command(description="Avançar/Retomar a música para um tempo específico.")
async def seek(self, ctx, pos="0"):
player: CustomPlayer = ctx.player
embed = discord.Embed(color=discord.Colour.red())
if player.current.is_stream:
embed.description = "Você não pode usar este comando em uma livestream."
await self.send_message(ctx, embed=embed)
return
seconds = seek_parser(pos)
if seconds is None:
embed.description = "Você usou um tempo inválido! Use segundos (1 ou 2 digitos) ou no formato (minutos):(segundos)"
return await self.send_message(ctx, embed=embed)
milliseconds = seconds * 1000
if milliseconds < 0:
milliseconds = 0
try:
await player.seek(milliseconds)
except Exception as e:
embed.description = f"Ocorreu um erro no comando\n```py\n{repr(e)}```."
await self.send_message(ctx, embed=embed)
return
txt = f"{'avançou' if milliseconds > player.position else 'voltou'} a música para: {time_format(milliseconds)}"
await self.interaction_message(ctx, txt)
@check_voice()
@has_player()
@is_dj()
@commands.dynamic_cooldown(user_cooldown(3, 5), commands.BucketType.member)
@commands.hybrid_command(name="shuffle", aliases=["misturar"], description="Misturar as músicas da fila")
async def shuffle_(self, ctx):
player = ctx.player
embed = discord.Embed(color=discord.Colour.red())
if len(player.queue) < 3:
embed.description = "A fila tem que ter no mínimo 3 músicas para ser misturada."
await self.send_message(ctx, embed=embed)
return
shuffle(player.queue)
txt = f"misturou as músicas da fila."
await self.interaction_message(ctx, txt)
@check_voice()
@has_source()
@is_dj()
@commands.dynamic_cooldown(user_cooldown(3, 5), commands.BucketType.member)
@commands.hybrid_command(aliases=["repeat", "repetir"], description="Ativar/Desativar a repetição da música atual")
async def loop(self, ctx):
player = ctx.player
player.loop = "current" if player.loop is False else False
txt = f"{'ativou' if player.loop else 'desativou'} a repetição da música."
await self.interaction_message(ctx, txt)
@check_voice()
@has_source()
@is_dj()
@commands.dynamic_cooldown(user_cooldown(3, 5), commands.BucketType.member)
@commands.hybrid_command(aliases=["repetirfila", "lq"], description="ativar repetição da fila")
async def loopqueue(self, ctx):
player: CustomPlayer = ctx.player
if player.loop == "queue":
embed = discord.Embed(color=discord.Colour.red())
embed.description = "a repetição da fila já está ativada."
await self.send_message(ctx, embed=embed)
return
player.loop = "queue"
await self.interaction_message(ctx, "ativou a repetição da fila.")
@check_voice()
@has_player()
@is_dj()
@app_commands.describe(item="música")
@commands.hybrid_command(description="Remover uma música específica da fila.", aliases=["delete", "remover", "del"])
async def remove(self, ctx, item: str):
embed = discord.Embed(color=discord.Colour.red())
if not item.isdigit():
embed.description = f"Você usou uma posição inválida: {item}"
await ctx.send(embed=embed)
return
player: CustomPlayer = ctx.player
try:
track = player.queue[int(item) - 1]
except IndexError:
embed.description = f"Você usou a posição de uma música inexistente na fila: {item}\n(Tamanho da fila atual: {len(player.queue)})"
await ctx.send(embed=embed)
return
player.queue.remove(track)
embed = discord.Embed(
description=f"{ctx.author.mention} removeu a música [`{fix_characters(track.title, limit=25)}`]({track.uri}) da fila.",
color=discord.Colour.green()
)
await ctx.send(embed=embed)
@check_voice()
@has_player()
@is_dj()
@commands.dynamic_cooldown(user_cooldown(2, 10), commands.BucketType.guild)
@commands.hybrid_command(description="Readicionar as músicas tocadas na fila.")
async def readd(self, ctx):
player: CustomPlayer = ctx.player
embed = discord.Embed(color=discord.Colour.red())
if not player.played:
embed.description = f"{ctx.author.mention} **não há músicas tocadas.**"
await ctx.send(embed=embed)
return
embed.colour = discord.Colour.green()
embed.description = f"{ctx.author.mention} **readicionou [{len(player.played)}] música(s) tocada(s) na fila.**"
player.played.reverse()
player.queue.extend(player.played)
player.played.clear()
await ctx.send(embed=embed)
if not player.current:
await player.process_next()
else:
player.update_message()
@check_voice()
@has_source()
@is_dj()
@commands.dynamic_cooldown(user_cooldown(2, 8), commands.BucketType.guild)
@app_commands.rename(query="música")
@commands.hybrid_command(description="Pula para a música especificada.")
async def skipto(self, ctx, *, query: str):
embed = discord.Embed(color=discord.Colour.red())
index = get_track_index(ctx, query)
if index is None:
embed.description = f"{ctx.author.mention} **não há músicas na fila com o nome: {query}**"
await ctx.reply(embed=embed, mention_author=False)
return
player: CustomPlayer = ctx.player
track = player.queue[index]
player.queue.append(player.last_track)
player.last_track = None
if player.loop == "current":
player.loop = False
if index > 0:
player.queue.rotate(0 - (index))
await player.stop()
embed.description = f"{ctx.author.mention} **pulou para a música:** [`{track.title}`]({track.uri})"
embed.colour = discord.Colour.green()
await ctx.send(embed=embed)
@check_voice()
@has_source()
@is_dj()
@app_commands.rename(pos="posição", query="música")
@commands.hybrid_command(description="Move uma música para a posição especificada da fila.")
async def move(self, ctx: commands.Context, *, query: str, position: int = None):
embed = discord.Embed(colour=discord.Colour.red())
if position is None:
if ctx.interaction:
position = 1
else:
try:
song = query.split(" ")
position = int(query[0])
query = " ".join(song[1:])
except:
position = 1
index = get_track_index(ctx, query)
if index is None:
embed.description = f"{ctx.author.mention} **não há músicas na fila com o nome: {query}**"
await ctx.reply(embed=embed, mention_author=False)
return
if position < 0:
embed.description = f"{ctx.author.mention} **você não pode usar número negativo.**"
await ctx.reply(embed=embed, mention_author=False)
return
player: CustomPlayer = ctx.player
track = player.queue[index]
player.queue.remove(track)
player.queue.insert(position - 1, track)
embed = discord.Embed(
description=f"{ctx.author.mention} moveu a música [`{fix_characters(track.title, limit=25)}`]({track.uri}) "
f"para a posição **[{position}]** da fila.",
color=discord.Colour.green()
)
await ctx.send(embed=embed)
player.update_message()
@check_voice()
@has_source()
@is_dj()
@commands.dynamic_cooldown(user_cooldown(2, 10), commands.BucketType.guild)
@app_commands.rename(query="música")
@commands.hybrid_command(aliases=["rt"], description="Rotaciona a fila para a música especificada.")
async def rotate(self, ctx, query: str):
embed = discord.Embed(colour=discord.Colour.red())
index = get_track_index(ctx, query)
if index is None:
embed.description = f"{ctx.author.mention} **não há músicas na fila com o nome: {query}**"
await ctx.reply(embed=embed, mention_author=False)
return
player: CustomPlayer = ctx.player
track = player.queue[index]
if index <= 0:
embed.description = f"{ctx.author.mention} **a música **[`{track.title}`]({track.uri}) já é a próxima da fila."
await ctx.reply(embed=embed, mention_author=False)
return
player.queue.rotate(0 - (index))
embed = discord.Embed(
description=f"{ctx.author.mention} rotacionou a fila para a música [`{fix_characters(track.title, limit=25)}`]({track.uri}).",
color=discord.Colour.green()
)
await ctx.send(embed=embed)
player.update_message()
@skipto.autocomplete("query")
@move.autocomplete("query")
@rotate.autocomplete("query")
async def queue_autocomplete(
self,
interaction: discord.Interaction,
current: str,
) -> List[app_commands.Choice[str]]:
try:
player = self.bot.music.players[interaction.guild.id]
return [
app_commands.Choice(name=t.title[:50], value=t.title)
for t in player.queue if not current or current.lower() in t.title.lower()
][:20]
except KeyError:
return []
@check_voice()
@has_source()
@is_dj()
@commands.cooldown(1, 5, commands.BucketType.guild)
@commands.hybrid_command(description="Ativar/Desativar o efeito nightcore (Música acelerada com tom mais agudo).",
aliases=['nc'])
async def nightcore(self, ctx):
player: CustomPlayer = ctx.player
op = {"op": "filters", "guildId": str(ctx.guild.id)}
player.nightcore = not player.nightcore
if player.nightcore:
op["timescale"] = {"pitch": 1.2, "speed": 1.1, "rate": 1.0}
txt = "ativou"
else:
txt = "desativou"
await player.node._send(**op)
txt = f"{txt} o efeito nightcore."
await self.interaction_message(ctx, txt)
@has_player()
@commands.hybrid_command(name="queue", description="Mostra as músicas que estão da fila.", aliases=['q', 'fila'])
@commands.max_concurrency(1, commands.BucketType.member)
async def q(self, ctx):
player: CustomPlayer = ctx.player
if not player.queue:
embedvc = discord.Embed(
colour=1646116,
description='Não existe músicas na fila no momento.'
)
await self.send_message(ctx, embed=embedvc)
return
view = QueueInteraction(player, ctx.author)
embed = view.embed
await ctx.send(embed=embed, view=view, ephemeral=True)
await view.wait()
@check_voice()
@has_source()
@is_dj()
@commands.dynamic_cooldown(user_cooldown(1, 10), commands.BucketType.guild)
@commands.hybrid_command(aliases=["limparfila", "qc"], description="Limpar a fila de música")
async def clearqueue(self, ctx):
player: CustomPlayer = ctx.player
embed = discord.Embed(color=discord.Colour.red())
if len(player.queue) < 1:
embed.description = f"{ctx.author.mention} **não há músicas na fila.**"
await ctx.send(embed=embed)
return
embed.colour = discord.Colour.green()
embed.description = f"{ctx.author.mention} **limpou a fila de música.**"
await ctx.send(embed=embed)
@has_player()
@is_dj()
async def restrict(self, ctx):
player: CustomPlayer = ctx.player
player.restrict_mode = not player.restrict_mode
embed = discord.Embed(
description=f"{ctx.author.mention} **{'' if player.restrict_mode else 'des'}ativou o modo "
f"restrito do player para dj's e staffs**", colour=ctx.guild.me.color)
await ctx.send(embed=embed)
@has_source()
@commands.hybrid_command(description="Reenvia a mensagem do player com a música atual.")
async def player(self, ctx):
player: CustomPlayer = ctx.player
await player.destroy_message()
await player.invoke_np()
@has_player()
@is_dj()
@commands.hybrid_command(description="Adicionar um membro à lista de DJ's [menção/id/nome]", aliases=["dj"])
async def adddj(self, ctx, *, member: discord.Member):
embed = discord.Embed(color=discord.Colour.red())
if member.guild_permissions.manage_channels:
text = f"você não pode adicionar o membro {member.mention} na lista de DJ's (ele(a) possui permissão de **gerenciar canais**)."
elif member == ctx.author:
text = "Você não pode adicionar a si mesmo na lista de DJ's."
elif member in ctx.player.dj:
text = f"O membro {member.mention} já está na lista de DJ's"
else:
ctx.player.dj.append(member)
embed.colour = discord.Colour.green()
text = f"O membro {member.mention} foi adicionado à lista de DJ's por {ctx.author.mention}."
embed.description = text
await ctx.send(embed=embed)
@commands.hybrid_command(aliases=["nodeinfo"], description="Ver informações dos servidores de música.")
@commands.cooldown(1, 5, commands.BucketType.user)
async def nodestats(self, ctx):
em = discord.Embed(color=ctx.me.color, title="Servidores de música:")
if not self.bot.music.nodes:
em.description = "**Não há servidores.**"
await ctx.send(embed=em)
return
for identifier, node in self.bot.music.nodes.items():
if not node.available: continue
txt = f"Região: `{node.region.title()}`\n"
current_player = True if node.players.get(ctx.guild.id) else False
if node.stats:
used = humanize.naturalsize(node.stats.memory_used)
total = humanize.naturalsize(node.stats.memory_allocated)
free = humanize.naturalsize(node.stats.memory_free)
cpu_cores = node.stats.cpu_cores
cpu_usage = f"{node.stats.lavalink_load * 100:.2f}"
started = node.stats.players
ram_txt = f'RAM: `{used}/{free} ({total})`'
txt += f'{ram_txt}\n' \
f'CPU Cores: `{cpu_cores}`\n' \
f'Uso de CPU: `{cpu_usage}%`\n' \
f'Uptime: `{time_format(node.stats.uptime)}`'
if started:
txt += "\nPlayers: "
players = node.stats.playing_players
idle = started - players
if players:
txt += f'`[▶️{players}]`' + (" " if idle else "")
if idle:
txt += f'`[💤{idle}]`'
if current_player:
status = "🌟"
else:
status = "✅" if node.is_available else '❌'
em.add_field(name=f'**{identifier}** `{status}`', value=txt)
await ctx.reply(embed=em, mention_author=False)
async def cog_before_invoke(self, ctx):
try:
ctx.player
except AttributeError:
ctx.player = ctx.bot.music.players.get(ctx.guild.id)
async def cog_command_error(self, ctx, error):
embed = discord.Embed(color=discord.Colour.red())
if isinstance(error, commands.RangeError):
embed.description = f"**Você deve usar um valor entre {error.minimum} e {error.maximum}.**"
return await self.send_message(ctx, embed=embed)
if isinstance(error, NotDJorStaff):
embed.description = "**Você deve estar na lista de DJ ou ter a permissão de **Gerenciar canais** para usar este comando.**"
return await self.send_message(ctx, embed=embed)
if isinstance(error, NotRequester):
embed.description = "**Você deve ser dono da música atual ou estar na lista de DJ ou ter a permissão de **Gerenciar canais** para pular músicas.**"
return await self.send_message(ctx, embed=embed)
if isinstance(error, DiffVoiceChannel):
embed.description = "**Você deve estar no meu canal de voz atual para usar este comando.**"
return await self.send_message(ctx, embed=embed)
if isinstance(error, NoSource):
embed.description = "**Não há músicas no player atualmente.**"
return await self.send_message(ctx, embed=embed)
if isinstance(error, NoVoice):
embed.description = "**Você deve estar em um canal de voz para usar este comando.**"
return await self.send_message(ctx, embed=embed)
if isinstance(error, NoPlayer):
embed.description = "**Não há player inicializado no servidor.**"
return await self.send_message(ctx, embed=embed)
if isinstance(error, MissingSpotifyClient):
embed.description = "**Não há suporte a links do spotify no momento.**"
return await self.send_message(ctx, embed=embed)
error = getattr(error, 'original', error)
if isinstance(error, commands.CommandOnCooldown):
remaing = int(error.retry_after)
if remaing < 1:
remaing = 1
embed.description = "**Você deve aguardar {} para usar este comando.**".format(
time_format(int(remaing) * 1000))
return await self.send_message(ctx, embed=embed)
if isinstance(error, commands.MaxConcurrencyReached):
bucket = commands.BucketType
txt = f"{error.number} vezes " if error.number > 1 else ''
txt = {
bucket.member: f"você já usou esse comando {txt}neste servidor",
bucket.guild: f"esse comando já foi usado {txt}neste servidor",
bucket.user: f"você já usou esse comando {txt}",
bucket.channel: f"esse comando já foi usado {txt}neste atual",
bucket.category: f"esse comando já foi usado {txt}na categoria do canal atual",
bucket.role: f"esse comando já foi usado {txt}por um membro que possui o cargo permitido",
bucket.default: f"esse comando já foi usado {txt}por alguém"
}
txt = txt.get(error.per)
txt = f"{ctx.author.mention} **{txt} e ainda não teve seu{'s' if error.number > 1 else ''} uso{'s' if error.number > 1 else ''} finalizado{'s' if error.number > 1 else ''}!**"
embed.description = txt
return await self.send_message(ctx, embed=embed)
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
if isinstance(error, commands.CommandNotFound):
return
embed.description = f"**Ocorreu um erro no comando:** `{ctx.invoked_with}`\n```py\n{str(repr(error))[:2020].replace(ctx.bot.http.token, 'mytoken')}```"
await self.send_message(ctx, embed=embed)
async def setup(bot: commands.Bot):
await bot.add_cog(Music(bot))
if __name__ == "__main__":
try:
token = os.environ["TOKEN"]
except KeyError:
token = "Token do bot aqui"
bot = TestBot(command_prefix=commands.when_mentioned_or("!"), intents=discord.Intents.all())
bot.run(token)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment