Skip to content

Instantly share code, notes, and snippets.

@NNKTV28
Created March 2, 2023 11:43
Show Gist options
  • Save NNKTV28/67bed8432de332f00d1de6bc67d539da to your computer and use it in GitHub Desktop.
Save NNKTV28/67bed8432de332f00d1de6bc67d539da to your computer and use it in GitHub Desktop.
import re
import discord
import os
import lavalink
from discord.ext import commands
from dotenv import load_dotenv
from lavalink.filters import LowPass
url_rx = re.compile(r'https?://(?:www\.)?.+')
load_dotenv()
premium = os.getenv('premium_guilds')
class LavalinkVoiceClient(discord.VoiceClient):
"""
This is the preferred way to handle external voice sending
This client will be created via a cls in the connect method of the channel
see the following documentation:
https://discordpy.readthedocs.io/en/latest/api.html#voiceprotocol
"""
def __init__(self, client: discord.Client, channel: discord.abc.Connectable):
self.client = client
self.channel = channel
# ensure a client already exists
if hasattr(self.client, 'lavalink'):
self.lavalink = self.client.lavalink
else:
self.client.lavalink = lavalink.Client(client.user.id)
self.client.lavalink.add_node(
'localhost',
2333,
'youshallnotpass',
'us',
'default-node'
)
self.lavalink = self.client.lavalink
async def on_voice_server_update(self, data):
# the data needs to be transformed before being handed down to
# voice_update_handler
lavalink_data = {
't': 'VOICE_SERVER_UPDATE',
'd': data
}
await self.lavalink.voice_update_handler(lavalink_data)
async def on_voice_state_update(self, data):
# the data needs to be transformed before being handed down to
# voice_update_handler
lavalink_data = {
't': 'VOICE_STATE_UPDATE',
'd': data
}
await self.lavalink.voice_update_handler(lavalink_data)
async def connect(self, *, timeout: float, reconnect: bool, self_deaf: bool = False, self_mute: bool = False) -> None:
"""
Connect the bot to the voice channel and create a player_manager
if it doesn't exist yet.
"""
# ensure there is a player_manager when creating a new voice_client
self.lavalink.player_manager.create(guild_id=self.channel.guild.id)
await self.channel.guild.change_voice_state(channel=self.channel, self_mute=self_mute, self_deaf=self_deaf)
async def disconnect(self, *, force: bool = False) -> None:
"""
Handles the disconnect.
Cleans up running player and leaves the voice client.
"""
player = self.lavalink.player_manager.get(self.channel.guild.id)
# no need to disconnect if we are not connected
if not force and not player.is_connected:
return
# None means disconnect
await self.channel.guild.change_voice_state(channel=None)
# update the channel_id of the player to None
# this must be done because the on_voice_state_update that would set channel_id
# to None doesn't get dispatched after the disconnect
player.channel_id = None
self.cleanup()
class Music(commands.Cog):
def __init__(self, bot):
self.bot = bot
if not hasattr(bot, 'lavalink'): # This ensures the client isn't overwritten during cog reloads.
bot.lavalink = lavalink.Client(bot.user.id)
bot.lavalink.add_node('lava1.horizxon.studio', 80, 'horizxon.studio', 'eu', 'default-node') # Host, Port, Password, Region, Name
lavalink.add_event_hook(self.track_hook)
def cog_unload(self):
""" Cog unload handler. This removes any event hooks that were registered. """
self.bot.lavalink._event_hooks.clear()
async def cog_before_invoke(self, ctx):
""" Command before-invoke handler. """
guild_check = ctx.guild is not None
# This is essentially the same as `@commands.guild_only()`
# except it saves us repeating ourselves (and also a few lines).
if guild_check:
await self.ensure_voice(ctx)
# Ensure that the bot and command author share a mutual voicechannel.
return guild_check
async def cog_command_error(self, ctx, error):
if isinstance(error, commands.CommandInvokeError):
await ctx.send(error.original)
# The above handles errors thrown in this cog and shows them to the user.
# This shouldn't be a problem as the only errors thrown in this cog are from `ensure_voice`
# which contain a reason string, such as "Join a voicechannel" etc. You can modify the above
# if you want to do things differently.
async def ensure_voice(self, ctx):
""" This check ensures that the bot and command author are in the same voicechannel. """
player = self.bot.lavalink.player_manager.create(ctx.guild.id)
# Create returns a player if one exists, otherwise creates.
# This line is important because it ensures that a player always exists for a guild.
# Most people might consider this a waste of resources for guilds that aren't playing, but this is
# the easiest and simplest way of ensuring players are created.
# These are commands that require the bot to join a voicechannel (i.e. initiating playback).
# Commands such as volume/skip etc don't require the bot to be in a voicechannel so don't need listing here.
should_connect = ctx.command.name in ('play',)
if not ctx.author.voice or not ctx.author.voice.channel:
# Our cog_command_error handler catches this and sends it to the voicechannel.
# Exceptions allow us to "short-circuit" command invocation via checks so the
# execution state of the command goes no further.
raise commands.CommandInvokeError('Join a voicechannel first.')
v_client = ctx.voice_client
if not v_client:
if not should_connect:
raise commands.CommandInvokeError('Not connected.')
permissions = ctx.author.voice.channel.permissions_for(ctx.me)
if not permissions.connect or not permissions.speak: # Check user limit too?
raise commands.CommandInvokeError('I need the `CONNECT` and `SPEAK` permissions.')
player.store('channel', ctx.channel.id)
await ctx.author.voice.channel.connect(cls=LavalinkVoiceClient)
else:
if v_client.channel.id != ctx.author.voice.channel.id:
raise commands.CommandInvokeError('You need to be in my voicechannel.')
async def track_hook(self, event):
if isinstance(event, lavalink.events.QueueEndEvent):
# When this track_hook receives a "QueueEndEvent" from lavalink.py
# it indicates that there are no tracks left in the player's queue.
# To save on resources, we can tell the bot to disconnect from the voicechannel.
guild_id = event.player.guild_id
guild = self.bot.get_guild(guild_id)
await guild.voice_client.disconnect(force=True)
@commands.command(name="play",description="?p, ?P", aliases=['p', 'P'])
async def play(self, ctx, *, query: str):
"""This function handles track search and queueing."""
print(f'Playing {query} in {ctx.guild.name}, ID: {ctx.guild.id}')
# Get the player for this guild from cache.
player = self.bot.lavalink.player_manager.get(ctx.guild.id)
# Remove leading and trailing <>. <> may be used to suppress embedding links in Discord.
query = query.strip('<>')
# Check if the user input might be a URL. If it isn't, we can Lavalink do a YouTube search for it instead.
# SoundCloud searching is possible by prefixing "scsearch:" instead.
if not url_rx.match(query):
query = f'ytsearch:{query}'
# Get the results for the query from Lavalink.
results = await player.node.get_tracks(query)
# Results could be None if Lavalink returns an invalid response (non-JSON/non-200 (OK)).
# Alternatively, results.tracks could be an empty array if the query yielded no tracks.
if not results or not results.tracks:
return await ctx.send('Nothing found!')
embed = discord.Embed(color=discord.Color.blurple())
# Valid loadTypes are:
# TRACK_LOADED - single video/direct URL)
# PLAYLIST_LOADED - direct URL to playlist)
# SEARCH_RESULT - query prefixed with either ytsearch: or scsearch:.
# NO_MATCHES - query yielded no results
# LOAD_FAILED - most likely, the video encountered an exception during loading.
if results.load_type == 'TRACK_LOADED':
if 'list' in query and not ctx.guild.id == premium and len(results['tracks']) > 10:
return await ctx.send('The free version is limited to queuing up to 10 tracks from playlists. Upgrade to premium to queue more.')
track = results['tracks'][0]
player.add(requester=ctx.author.id, track=track)
if results.load_type == 'PLAYLIST_LOADED':
if not premium:
tracks = results.tracks[:10] # Only select the first 10 tracks
for track in tracks:
if tracks < 10:
# Add the selected tracks from the playlist to the queue.
player.add(requester=ctx.author.id, track=track)
embed.title = 'Playlist too big!'
embed.description = f'{results.playlist_info.name} - {len(tracks)} tracks (only 10 tracks are enqueued because you are not a premium user)'
embed.title = 'Playlist Enqueued!'
embed.description = f'{results.playlist_info.name} - {len(tracks)} tracks (only 10 tracks enqueued because you are not a premium user)'
else:
tracks = results.tracks
for track in tracks:
# Add all of the tracks from the playlist to the queue.
player.add(requester=ctx.author.id, track=track)
embed.title = 'Playlist Enqueued!'
embed.description = f'▶️ {results.playlist_info.name} - {len(tracks)} tracks'
else:
track = results.tracks[0]
embed.title = 'Track Enqueued'
embed.description = f'▶️ [{track.title}]({track.uri})'
player.add(requester=ctx.author.id, track=track)
embed.set_author(
name=self.bot.user.name,
icon_url=self.bot.user.display_avatar.url
)
await ctx.send(embed=embed)
# We don't want to call .play() if the player is playing as that will effectively skip
# the current track.
if not player.is_playing:
await player.play()
@commands.command(description="?lp, ?LP", aliases=['lp', 'LP'])
async def lowpass(self, ctx, strength: float):
""" Sets the strength of the low pass filter. """
# Get the player for this guild from cache.
player = self.bot.lavalink.player_manager.get(ctx.guild.id)
# This enforces that strength should be a minimum of 0.
# There's no upper limit on this filter.
strength = max(0.0, strength)
# Even though there's no upper limit, we will enforce one anyway to prevent
# extreme values from being entered. This will enforce a maximum of 100.
strength = min(100, strength)
embed = discord.Embed(color=discord.Color.blurple(), title='Low Pass Filter')
# A strength of 0 effectively means this filter won't function, so we can disable it.
if strength == 0.0:
player.remove_filter('lowpass')
embed.description = 'Disabled **Low Pass Filter**'
return await ctx.send(embed=embed)
# Set the filter strength to the user's desired level.
self.low_pass.update(smoothing=strength)
# This applies our filter. If the filter is already enabled on the player, then this will
# just overwrite the filter with the new values.
await player.set_filter(self.low_pass)
embed.description = f'Set **Low Pass Filter** strength to {strength}.'
embed.set_author(
name=self.bot.user.name,
icon_url=self.bot.user.display_avatar.url
)
await ctx.send(embed=embed)
@commands.command(name='pause', description='?sp, ?Sp', aliases=['sp', 'Sp'])
async def pause(self, ctx, client):
"Pauses the current song"
voice_state = ctx.author.voice
player = self.bot.lavalink.player_manager.get(ctx.guild.id)
client = self.client
if not voice_state or not voice_state.channel:
return await ctx.send('You need to be in a voice channel to use this command.')
if not self.bot.players.get(ctx.guild.id):
return await ctx.send('There is no player for this guild.')
if player.is_paused:
return await ctx.send('The player is already paused.')
await player.set_pause(True)
embed = discord.Embed(
title=f'Paused',
description="⏸ Paused the player",
color=discord.Color.dark_blue(),
timestamp=ctx.message.created_at
)
embed.set_author(
name=self.bot.user.name,
icon_url=self.bot.user.display_avatar.url
)
await ctx.send(embed=embed)
@commands.command(name='resume', description='?rs, ?RS', aliases=['rs', 'RS'])
async def resume(self, ctx, client):
"Resumes the current song"
#if ctx.message.content.lower().startswith("?resume"):
voice_state = ctx.author.voice
player = self.bot.lavalink.player_manager.get(ctx.guild.id)
client = self.client
if not voice_state or not voice_state.channel:
return await ctx.send('You need to be in a voice channel to use this command.')
if not self.bot.players.get(ctx.guild.id):
return await ctx.send('There is no player for this guild.')
player = self.bot.lavalink.players.get(ctx.guild.id)
if not player.is_paused:
return await ctx.send('The player is not paused.')
await player.set_pause(False)
embed = discord.Embed(
title=f'Resumed',
description="▶️ Resumed the player",
color=discord.Color.dark_blue(),
timestamp=ctx.message.created_at
)
embed.set_author(
name=self.bot.user.name,
icon_url=self.bot.user.display_avatar.url
)
await ctx.send(embed=embed)
async def setup(bot):
await bot.add_cog(Music(bot))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment