Skip to content

Instantly share code, notes, and snippets.

@NNKTV28
Created February 21, 2023 11:40
Show Gist options
  • Save NNKTV28/d94a4cf94fab49daeaf8186b4233bbb2 to your computer and use it in GitHub Desktop.
Save NNKTV28/d94a4cf94fab49daeaf8186b4233bbb2 to your computer and use it in GitHub Desktop.
import itertools
import re
import discord
import lavalink
from discord.ext import commands
from lavalink.filters import LowPass
url_rx = re.compile(r'https?://(?:www\.)?.+')
class LavalinkVoiceClient(discord.VoiceClient):
"""
This is the preferred way to handle external voice sending
This client will be created via a cls in the connect method of the channel
see the following documentation:
https://discordpy.readthedocs.io/en/latest/api.html#voiceprotocol
"""
def __init__(self, client: discord.Client, channel: discord.abc.Connectable):
self.client = client
self.channel = channel
# ensure a client already exists
if hasattr(self.client, 'lavalink'):
self.lavalink = self.client.lavalink
else:
self.client.lavalink = lavalink.Client(client.user.id)
self.client.lavalink.add_node(
'localhost',
2333,
'youshallnotpass',
'us',
'default-node'
)
self.lavalink = self.client.lavalink
async def on_voice_server_update(self, data):
# the data needs to be transformed before being handed down to
# voice_update_handler
lavalink_data = {
't': 'VOICE_SERVER_UPDATE',
'd': data
}
await self.lavalink.voice_update_handler(lavalink_data)
async def on_voice_state_update(self, data):
# the data needs to be transformed before being handed down to
# voice_update_handler
lavalink_data = {
't': 'VOICE_STATE_UPDATE',
'd': data
}
await self.lavalink.voice_update_handler(lavalink_data)
async def connect(self, *, timeout: float, reconnect: bool, self_deaf: bool = False, self_mute: bool = False) -> None:
"""
Connect the bot to the voice channel and create a player_manager
if it doesn't exist yet.
"""
# ensure there is a player_manager when creating a new voice_client
self.lavalink.player_manager.create(guild_id=self.channel.guild.id)
await self.channel.guild.change_voice_state(channel=self.channel, self_mute=self_mute, self_deaf=self_deaf)
async def disconnect(self, *, force: bool = False) -> None:
"""
Handles the disconnect.
Cleans up running player and leaves the voice client.
"""
player = self.lavalink.player_manager.get(self.channel.guild.id)
# no need to disconnect if we are not connected
if not force and not player.is_connected:
return
# None means disconnect
await self.channel.guild.change_voice_state(channel=None)
# update the channel_id of the player to None
# this must be done because the on_voice_state_update that would set channel_id
# to None doesn't get dispatched after the disconnect
player.channel_id = None
self.cleanup()
class Music(commands.Cog):
def __init__(self, bot):
self.bot = bot
if not hasattr(bot, 'lavalink'): # This ensures the client isn't overwritten during cog reloads.
bot.lavalink = lavalink.Client(bot.user.id)
bot.lavalink.add_node('lava1.horizxon.studio', 80, 'horizxon.studio', 'eu', 'default-node') # Host, Port, Password, Region, Name
lavalink.add_event_hook(self.track_hook)
def cog_unload(self):
""" Cog unload handler. This removes any event hooks that were registered. """
self.bot.lavalink._event_hooks.clear()
async def cog_before_invoke(self, ctx):
""" Command before-invoke handler. """
guild_check = ctx.guild is not None
# This is essentially the same as `@commands.guild_only()`
# except it saves us repeating ourselves (and also a few lines).
if guild_check:
await self.ensure_voice(ctx)
# Ensure that the bot and command author share a mutual voicechannel.
return guild_check
async def cog_command_error(self, ctx, error):
if isinstance(error, commands.CommandInvokeError):
await ctx.send(error.original)
# The above handles errors thrown in this cog and shows them to the user.
# This shouldn't be a problem as the only errors thrown in this cog are from `ensure_voice`
# which contain a reason string, such as "Join a voicechannel" etc. You can modify the above
# if you want to do things differently.
async def ensure_voice(self, ctx):
""" This check ensures that the bot and command author are in the same voicechannel. """
player = self.bot.lavalink.player_manager.create(ctx.guild.id)
# Create returns a player if one exists, otherwise creates.
# This line is important because it ensures that a player always exists for a guild.
# Most people might consider this a waste of resources for guilds that aren't playing, but this is
# the easiest and simplest way of ensuring players are created.
# These are commands that require the bot to join a voicechannel (i.e. initiating playback).
# Commands such as volume/skip etc don't require the bot to be in a voicechannel so don't need listing here.
should_connect = ctx.command.name in ('play',)
if not ctx.author.voice or not ctx.author.voice.channel:
# Our cog_command_error handler catches this and sends it to the voicechannel.
# Exceptions allow us to "short-circuit" command invocation via checks so the
# execution state of the command goes no further.
raise commands.CommandInvokeError('Join a voicechannel first.')
v_client = ctx.voice_client
if not v_client:
if not should_connect:
raise commands.CommandInvokeError('Not connected.')
permissions = ctx.author.voice.channel.permissions_for(ctx.me)
if not permissions.connect or not permissions.speak: # Check user limit too?
raise commands.CommandInvokeError('I need the `CONNECT` and `SPEAK` permissions.')
player.store('channel', ctx.channel.id)
await ctx.author.voice.channel.connect(cls=LavalinkVoiceClient)
else:
if v_client.channel.id != ctx.author.voice.channel.id:
raise commands.CommandInvokeError('You need to be in my voicechannel.')
async def track_hook(self, event):
if isinstance(event, lavalink.events.QueueEndEvent):
# When this track_hook receives a "QueueEndEvent" from lavalink.py
# it indicates that there are no tracks left in the player's queue.
# To save on resources, we can tell the bot to disconnect from the voicechannel.
guild_id = event.player.guild_id
guild = self.bot.get_guild(guild_id)
await guild.voice_client.disconnect(force=True)
@commands.command(aliases=['p'])
async def play(self, ctx, *, query: str):
print(f'Playing {query} in {ctx.guild.id}')
""" Searches and plays a song from a given query. """
# Get the player for this guild from cache.
player = self.bot.lavalink.player_manager.get(ctx.guild.id)
# Remove leading and trailing <>. <> may be used to suppress embedding links in Discord.
query = query.strip('<>')
# Check if the user input might be a URL. If it isn't, we can Lavalink do a YouTube search for it instead.
# SoundCloud searching is possible by prefixing "scsearch:" instead.
if not url_rx.match(query):
query = f'ytsearch:{query}'
# Get the results for the query from Lavalink.
results = await player.node.get_tracks(query)
# Results could be None if Lavalink returns an invalid response (non-JSON/non-200 (OK)).
# Alternatively, results.tracks could be an empty array if the query yielded no tracks.
if not results or not results.tracks:
return await ctx.send('Nothing found!')
embed = discord.Embed(color=discord.Color.blurple())
# Valid loadTypes are:
# TRACK_LOADED - single video/direct URL)
# PLAYLIST_LOADED - direct URL to playlist)
# SEARCH_RESULT - query prefixed with either ytsearch: or scsearch:.
# NO_MATCHES - query yielded no results
# LOAD_FAILED - most likely, the video encountered an exception during loading.
if results.load_type == 'PLAYLIST_LOADED':
tracks = results.tracks
for track in tracks:
# Add all of the tracks from the playlist to the queue.
player.add(requester=ctx.author.id, track=track)
embed.title = 'Playlist Enqueued!'
embed.description = f'{results.playlist_info.name} - {len(tracks)} tracks'
else:
track = results.tracks[0]
embed.title = 'Track Enqueued'
embed.description = f'[{track.title}]({track.uri})'
player.add(requester=ctx.author.id, track=track)
await ctx.send(embed=embed)
# We don't want to call .play() if the player is playing as that will effectively skip
# the current track.
if not player.is_playing:
await player.play()
@commands.command(aliases=['lp'])
async def lowpass(self, ctx, strength: float):
""" Sets the strength of the low pass filter. """
# Get the player for this guild from cache.
player = self.bot.lavalink.player_manager.get(ctx.guild.id)
# This enforces that strength should be a minimum of 0.
# There's no upper limit on this filter.
strength = max(0.0, strength)
# Even though there's no upper limit, we will enforce one anyway to prevent
# extreme values from being entered. This will enforce a maximum of 100.
strength = min(100, strength)
embed = discord.Embed(color=discord.Color.blurple(), title='Low Pass Filter')
# A strength of 0 effectively means this filter won't function, so we can disable it.
if strength == 0.0:
player.remove_filter('lowpass')
embed.description = 'Disabled **Low Pass Filter**'
return await ctx.send(embed=embed)
# Lets create our filter.
low_pass = LowPass()
low_pass.update(smoothing=strength) # Set the filter strength to the user's desired level.
# This applies our filter. If the filter is already enabled on the player, then this will
# just overwrite the filter with the new values.
await player.set_filter(low_pass)
embed.description = f'Set **Low Pass Filter** strength to {strength}.'
await ctx.send(embed=embed)
@commands.command(aliases=['dc'])
async def disconnect(self, ctx):
""" Disconnects the player from the voice channel and clears its queue. """
player = self.bot.lavalink.player_manager.get(ctx.guild.id)
if not ctx.voice_client:
# We can't disconnect, if we're not connected.
return await ctx.send('Not connected.')
if not ctx.author.voice or (player.is_connected and ctx.author.voice.channel.id != int(player.channel_id)):
# Abuse prevention. Users not in voice channels, or not in the same voice channel as the bot
# may not disconnect the bot.
return await ctx.send('You\'re not in my voicechannel!')
# Clear the queue to ensure old tracks don't start playing
# when someone else queues something.
player.queue.clear()
# Stop the current track so Lavalink consumes less resources.
await player.stop()
# Disconnect from the voice channel.
await ctx.voice_client.disconnect(force=True)
await ctx.send('*⃣ | Disconnected.')
@commands.command(name='queue')
async def queue_(self, ctx):
"""See the list of songs that will play next"""
player = self.bot.lavalink.player_manager.get(ctx.guild.id)
if not player.queue:
return await ctx.send('There are no songs in the queue.')
upcoming = list(itertools.islice(player.queue, 0, 10))
queue_list = ''
for i, track in enumerate(upcoming):
queue_list += f'{i+1}) [{track.title}]({track.uri})\n'
embed = discord.Embed(
title=f'Upcoming Tracks - {len(player.queue)}',
description=queue_list,
color=discord.Color.dark_blue(),
timestamp=ctx.message.created_at
)
await ctx.send(embed=embed)
@commands.command(name = 'volume')
async def volume(self, ctx, vol: int):
player = self.bot.lavalink.player_manager.get(ctx.guild.id)
if not player.is_connected:
return await ctx.send("I'm not connected to a voice channel.")
if vol < 0 or vol > 100:
return await ctx.send("Volume must be between 0 and 100.")
await player.set_volume(vol)
await ctx.send(f"Volume set to {vol}%.")
async def setup(bot):
await bot.add_cog(Music(bot))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment