Skip to content

Instantly share code, notes, and snippets.

@535i
Forked from vbe0201/music_bot_example.py
Last active February 12, 2024 16:32
Show Gist options
  • Save 535i/9636b42158101763efeb696bf5397bba to your computer and use it in GitHub Desktop.
Save 535i/9636b42158101763efeb696bf5397bba to your computer and use it in GitHub Desktop.
A simple music bot written using discord.py rewrite and youtube_dl.
# -*- coding: utf-8 -*-
"""
Copyright (c) 2019 Valentin B.
Modified by @535i on 12.02.2024
A music bot written in Python with disnake and yt-dlp
Requirements:
Python 3.5+
pip install -U disnake pynacl yt-dlp
You also need FFmpeg in your PATH environment variable or the FFmpeg.exe binary in your bot's directory on Windows.
"""
import disnake
from disnake.ext import commands
from disnake.ext.commands import Context
from disnake import ApplicationCommandInteraction
import asyncio
import itertools
import sys
import time
import traceback
from async_timeout import timeout
from functools import partial
from yt_dlp import YoutubeDL
import yt_dlp
import random
import datetime
yt_dlp.utils.bug_reports_message = lambda: ''
ytdlopts = {
'format': 'bestaudio/best',
'outtmpl': 'downloads/%(title)s [%(id)s].%(ext)s',
'restrictfilenames': True,
'noplaylist': True,
'nocheckcertificate': True,
'ignoreerrors': False,
'logtostderr': False,
'quiet': True,
'no_warnings': True,
'default_search': 'auto',
'source_address': '0.0.0.0',
'force-ipv4': True,
'cachedir': True,
"verbose": False,
"xff": "default"
}
ffmpegopts = {
'before_options': '-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5',
'options': '-vn -loglevel panic'
}
#'-nostdin'
ytdl = YoutubeDL(ytdlopts)
class VoiceConnectionError(commands.CommandError):
pass
class InvalidVoiceChannel(VoiceConnectionError):
pass
class YTDLSource(disnake.PCMVolumeTransformer):
def __init__(self, source, *, data, requester):
super().__init__(source)
self.requester = requester
self.title = data.get('title')
self.web_url = data.get('webpage_url')
self.duration = data.get('duration')
self.thumbnail = data.get('thumbnail')
def __getitem__(self, item: str):
return self.__getattribute__(item)
@classmethod
async def create_source(cls, ctx, search: str, *, loop, download=False, looping=False):
loop = loop or asyncio.get_event_loop()
to_run = partial(ytdl.extract_info, url=search, download=download)
data = await loop.run_in_executor(None, to_run)
if 'entries' in data:
data = data['entries'][0]
duration = int(data['duration'])
if not looping:
embed = disnake.Embed(title="Added", description="[{}]({})".format(data["title"], data["webpage_url"]),
color=disnake.Color.blue())
embed.add_field(name="Duration", value=f"{str(datetime.timedelta(seconds=duration))}")
embed.set_thumbnail(url=data["thumbnail"])
#channel = ctx.channel
await ctx.send(embed=embed)
if download:
source = ytdl.prepare_filename(data)
else:
return {'webpage_url': data['webpage_url'], 'requester': ctx.author, 'title': data['title'], 'thumbnail': data['thumbnail']}
return cls(disnake.FFmpegPCMAudio(source, **ffmpegopts), data=data, requester=ctx.author)
@classmethod
async def regather_stream(cls, data, *, loop):
loop = loop or asyncio.get_event_loop()
requester = data['requester']
to_run = partial(ytdl.extract_info, url=data['webpage_url'], download=False)
data = await loop.run_in_executor(None, to_run)
return cls(disnake.FFmpegPCMAudio(data['url'], **ffmpegopts), data=data, requester=requester)
class MusicPlayer(commands.Cog):
__slots__ = ('bot', '_guild', '_channel', '_cog', '_loop', '_ctx', 'queue', 'next', 'current', 'np', 'volume')
def __init__(self, ctx: Context):
self.bot = ctx.bot
self._guild = ctx.guild
self._channel = ctx.channel
self._cog = ctx.cog
self._loop = False
self._ctx = ctx
self.queue = asyncio.Queue()
self.next = asyncio.Event()
self.np = None
self.volume = .5
self.current = None
ctx.bot.loop.create_task(self.player_loop())
@property
def loop(self):
return self._loop
@loop.setter
def loop(self, value: bool):
self._loop = value
async def player_loop(self):
await self.bot.wait_until_ready()
self._url = None
while not self.bot.is_closed():
self.next.clear()
if self.loop == False:
try:
async with timeout(300):
source = await self.queue.get()
except asyncio.TimeoutError:
self.destroy(self._guild, self._channel)
return
if not isinstance(source, YTDLSource):
try:
source = await YTDLSource.regather_stream(source, loop=self.bot.loop)
except Exception as e:
await self._channel.send(f'There was an error processing your song.\n'
f'```css\n[{e}]\n```')
continue
source.volume = self.volume
self.current = source
self._url = source.web_url
try:
self._guild.voice_client.play(source, after=lambda _: self.bot.loop.call_soon_threadsafe(self.next.set))
self.np = await self._channel.send(f'**Playing:** `{source.title}` requested from '
f'`{source.requester}`')
except disnake.ClientException:
pass
elif self.loop == True:
ctx = self._ctx
source = await YTDLSource.create_source(ctx, self._url, loop=self.bot.loop, download=False, looping=True)
if not isinstance(source, YTDLSource):
try:
source = await YTDLSource.regather_stream(source, loop=self.bot.loop)
except Exception as e:
await self._channel.send(f'There was an error processing your song.\n'
f'```css\n[{e}]\n```')
continue
try:
self._guild.voice_client.play(source, after=self.play_next_song)
self.np = await self._channel.send(f'**Playing:** `{source.title}` requested from '
f'`{source.requester}` (looping)')
except disnake.ClientException:
pass
await self.next.wait()
try:
source.cleanup()
except ValueError:
pass
self.current = None
try:
await self.np.delete()
except disnake.HTTPException:
pass
def play_next_song(self, error=None):
if error:
raise error
self.next.set()
def destroy(self, guild, channel):
return self.bot.loop.create_task(self._cog.cleanup(guild, channel, left_channel=True))
class Music(commands.Cog):
"""The Bot will give you a variety of commands, including:
• playing music
• pausing, resuming the audio
• looping, shuffeling, or viewing the top 10 songs in the queue.
And u can use all of this in multiple guilds simultaneously.
When u use the !stop command, the bot will leave the voice channel
and destroy its server instance for performance reasons.
"""
__slots__ = ('bot', 'players')
def __init__(self, bot: commands.Bot):
self.bot = bot
self.players = {}
async def cleanup(self, guild: disnake.Guild, channel, left_channel=True):
if guild.voice_client:
await guild.voice_client.disconnect()
if left_channel:
await channel.send("I left the channel due to inactivity.")
try:
del self.players[guild.id]
except KeyError:
pass
async def __local_check(self, ctx: Context):
if not ctx.guild:
raise commands.NoPrivateMessage
return True
async def __error(self, ctx: Context, error):
if isinstance(error, commands.NoPrivateMessage):
try:
return await ctx.send('You cannot use this inside dm.')
except disnake.HTTPException:
pass
elif isinstance(error, InvalidVoiceChannel):
await ctx.send(
'There was a error trying to connect to a voice channel.\nMake sure to join a voice channel.')
print('Ignoring exception in command {}:'.format(ctx.command), file=sys.stderr)
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
def get_player(self, ctx):
"""Retrieve the guild player, or generate one."""
try:
player = self.players[ctx.guild.id]
except KeyError:
player = MusicPlayer(ctx)
self.players[ctx.guild.id] = player
return player
#SlashCommands
@commands.slash_command(name='connect')
@commands.guild_only()
async def _connect(self, inter: ApplicationCommandInteraction):
"""Connects the bot to your voice channel."""
try:
channel = inter.author.voice.channel
except AttributeError:
return await inter.send("No channel to join. Make sure you are in a voice channel.")
vc = inter.guild.voice_client
if vc:
if vc.channel.id == channel.id:
return await inter.send(f"I am already connected to <#{channel.id}>")
try:
await vc.move_to(channel)
except asyncio.TimeoutError:
return await inter.send(f'Moving to channel: <#{channel.id}> timed out.')
if not vc:
try:
await channel.connect()
except asyncio.TimeoutError:
await inter.send(f'Connecting to channel: <#{channel.id}> timed out.')
await inter.send(f'Connected with: **{channel}**')
@commands.slash_command(name='pause', description="Pause the current song")
@commands.guild_only()
async def _pause(self, inter: ApplicationCommandInteraction):
if not inter.author.voice:
return await inter.send("You are not connected to a voice channel.")
vc = inter.guild.voice_client
if not vc or not vc.is_playing():
return await inter.send('Im not connected to a voice channel!')
elif vc.is_paused():
return
vc.pause()
await inter.send(f'**`{inter.author}`**: Paused the music!')
@commands.slash_command(name='resume', description="Resume the paused song")
@commands.guild_only()
async def _resume(self, inter: ApplicationCommandInteraction):
if not inter.author.voice:
return await inter.send("You are not connected to a voice channel.")
vc = inter.guild.voice_client
if not vc or not vc.is_connected():
return await inter.send('Im not connected to a voice channel!')
elif not vc.is_paused():
return
vc.resume()
await inter.send(f'**`{inter.author}`**: Resuming!')
@commands.slash_command(name='skip', description="Skip the current song")
@commands.guild_only()
async def _skip(self, inter: ApplicationCommandInteraction):
await inter.response.defer()
if not inter.author.voice:
return await inter.followup.send("You are not connected to a voice channel.")
vc = inter.guild.voice_client
if not vc or not vc.is_connected():
return await inter.followup.send('I am not connected to a voice channel!')
player = self.get_player(inter)
if inter.author == vc.source.requester:
if not vc.is_playing():
return await inter.followup.send('I am not playing anything!')
vc.stop()
return await inter.followup.send(f'**`{inter.author}`**: Skipped!')
@commands.slash_command(name='clear_queue', description="Removes all songs from the queue")
@commands.guild_only()
async def _clear_queue(self, inter: ApplicationCommandInteraction):
if not inter.author.voice:
return await inter.send("You are not connected to a voice channel.")
vc = inter.guild.voice_client
if not vc or not vc.is_connected():
return await inter.send('Im not connected to a voice channel!')
player = self.get_player(inter)
player.queue._queue.clear()
await inter.send('Cleared the queue.')
@commands.slash_command(name='queue', description="Shows the queue")
@commands.guild_only()
async def _queue_info(self, inter: ApplicationCommandInteraction):
await inter.response.defer()
if not inter.author.voice:
return await inter.followup.send("You are not connected to a voice channel.")
vc = inter.guild.voice_client
if not vc or not vc.is_connected():
return await inter.followup.send('Im not connected to a voice channel!')
player = self.get_player(inter)
if player.queue.empty():
return await inter.followup.send('There are currently no songs in the queue.')
# Grab up to 5 entries from the queue...
upcoming = list(itertools.islice(player.queue._queue, 0, 5))
fmt = '\n'.join(f'**`{_["title"]}`**' for _ in upcoming)
embed = disnake.Embed(title=f'Queue - Next {len(upcoming)}', description=fmt,
colour=disnake.Colour.green())
await inter.followup.send(embed=embed)
@commands.slash_command(name='now_playing', description="Shows the currently playing song")
@commands.guild_only()
async def _now_playing(self, inter: ApplicationCommandInteraction):
if not inter.author.voice:
return await inter.send("You are not connected to a voice channel.")
vc = inter.guild.voice_client
if not vc or not vc.is_connected():
return await inter.send('Im not connected to a voice channel!', )
player = self.get_player(inter)
if not player.current:
return await inter.send('Im not playing anything.')
try:
# Remove our previous now_playing message.
await player.np.delete()
except disnake.HTTPException:
pass
player.np = await inter.send(f'**Playing:** `{vc.source.title}` '
f'requested from `{vc.source.requester}`')
@commands.slash_command(name='volume', description="Changes the players volume")
@commands.guild_only()
async def _change_volume(self, inter: ApplicationCommandInteraction, *, vol: float):
"""
Parameters
----------
vol: Choose from 1 to 100
"""
if not inter.author.voice:
return await inter.send("You are not connected to a voice channel.")
vc = inter.guild.voice_client
if not vc or not vc.is_connected():
return await inter.send('Im not connected to a voice channel!')
if not 0 < vol < 101:
return await inter.send('Just enter a number between 1 and 100.')
player = self.get_player(inter)
if vc.source:
vc.source.volume = vol / 100
player.volume = vol / 100
await inter.send(f'**`{inter.author}`** Current volume: **{vol}%**')
@commands.slash_command(name='shuffle', description="Shuffles the queue")
@commands.guild_only()
async def _shuffe(self, inter: ApplicationCommandInteraction):
if not inter.author.voice:
return await inter.send("You are not connected to a voice channel.")
vc = inter.guild.voice_client
if not vc or not vc.is_connected():
return await inter.send('Im not connected to a voice channel!')
player = self.get_player(inter)
if player.queue.empty():
return await inter.send('There are currently no songs in the queue.')
random.shuffle(player.queue._queue)
await inter.send("Shuffled the queue.")
@commands.slash_command(name='restart', description="Restarts the current track")
@commands.guild_only()
async def _restart(self, inter: ApplicationCommandInteraction):
if not inter.author.voice:
return await inter.send("You are not connected to a voice channel.")
vc = inter.guild.voice_client
if not vc or not vc.is_connected():
return await inter.send('Im not connected to a voice channel!')
if not vc.is_playing():
return await inter.send("Im not playing anything.")
player = self.get_player(inter)
source = await YTDLSource.create_source(inter, vc.source.url, loop=self.bot.loop, download=False)
await player.queue.put(source)
await inter.send("Restarted track.")
vc.stop()
@commands.slash_command(name='loop')
@commands.guild_only()
async def _loop(self, inter: ApplicationCommandInteraction):
"""Looping the current song"""
if not inter.author.voice:
return await inter.send("You are not connected to a voice channel.")
vc = inter.guild.voice_client
if not vc or not vc.is_connected():
return await inter.send('Im not connected to a voice channel!')
player = self.get_player(inter)
if not vc.is_playing():
return await inter.send("Im not playing anything.")
if player.loop == False:
player.loop = True
else:
player.loop = False
return await inter.send(f"Looping is now {('enabled :white_check_mark:' if player.loop else 'disabled')}")
@commands.slash_command(name='stop', description="Stops the bot instance")
@commands.guild_only()
async def _stop(self, inter: ApplicationCommandInteraction):
if not inter.author.voice:
return await inter.send("You are not connected to a voice channel.")
vc = inter.guild.voice_client
if not vc or not vc.is_connected():
return await inter.send('Im not connected to a voice channel!')
if vc.is_playing():
vc.stop
await self.cleanup(inter.guild, inter.channel, left_channel=False)
await inter.send("Left the voice channel.👋")
#Prefix Commands
@commands.command(name='connect', aliases=['join'])
@commands.guild_only()
async def connect(self, ctx: Context):
"""Connects the bot to your voice channel."""
try:
channel = ctx.author.voice.channel
except AttributeError:
await ctx.send("No channel to join. Make sure you are in a voice channel.")
vc = ctx.guild.voice_client
if vc:
if vc.channel.id == channel.id:
return
try:
await vc.move_to(channel)
except asyncio.TimeoutError:
await ctx.send(f'Moving to channel: <#{channel.id}> timed out.')
else:
try:
await channel.connect()
except asyncio.TimeoutError:
await ctx.send(f'Connecting to channel: <#{channel.id}> timed out.')
await ctx.send(f'Connected with: **{channel}**')
@commands.command(name='play', aliases=['sing'])
@commands.guild_only()
async def play(self, ctx: Context, *, search: str = None):
"""Type the name or put in the url. Only Youtube support."""
await ctx.trigger_typing()
if search is None:
embed = disnake.Embed(title=f"The command looks like this: ```{ctx.prefix}play [song name or yt url]```",
colour=disnake.Colour.blue())
await ctx.send(embed=embed)
return
if not ctx.author.voice:
return await ctx.send("You are not connected to a voice channel.")
voice_channel = ctx.author.voice.channel
if not ctx.guild.voice_client:
await voice_channel.connect()
#await ctx.send(f'Connected with: **{voice_channel}**')
player = self.get_player(ctx)
source = await YTDLSource.create_source(ctx, search, loop=self.bot.loop, download=False, looping=False)
await player.queue.put(source)
@commands.command(name='pause')
@commands.guild_only()
async def pause(self, ctx: Context):
"""Pause the song."""
if not ctx.author.voice:
return await ctx.send("You are not connected to a voice channel.")
vc = ctx.guild.voice_client
if not vc or not vc.is_playing():
return await ctx.send('Im not connected to a voice channel!')
elif vc.is_paused():
return
vc.pause()
await ctx.send(f'**`{ctx.author}`**: Paused the music!')
# await ctx.message.add_reaction('??')
@commands.command(name='resume')
@commands.guild_only()
async def resume(self, ctx: Context):
"""Resume the paused song."""
if not ctx.author.voice:
return await ctx.send("You are not connected to a voice channel.")
vc = ctx.guild.voice_client
if not vc or not vc.is_connected():
return await ctx.send('Im not connected to a voice channel!')
elif not vc.is_paused():
return
vc.resume()
await ctx.send(f'**`{ctx.author}`**: Resuming!')
# await ctx.message.add_reaction('??')
@commands.command(name='skip')
@commands.guild_only()
async def skip(self, ctx: Context):
"""Skip a song."""
if not ctx.author.voice:
return await ctx.send("You are not connected to a voice channel.")
vc = ctx.guild.voice_client
if not vc or not vc.is_connected():
return await ctx.send('I am not connected to a voice channel!')
player = self.get_player(ctx)
if ctx.author == vc.source.requester:
if not vc.is_playing():
return await ctx.send('I am not playing anything!')
vc.stop()
return await ctx.send(f'**`{ctx.author}`**: Skipped!')
@commands.command(name='clear_queue', aliases=["cq"])
@commands.guild_only()
async def clear_queue(self, ctx: Context):
"""Removes all songs from the queue."""
if not ctx.author.voice:
return await ctx.send("You are not connected to a voice channel.")
vc = ctx.guild.voice_client
if not vc or not vc.is_connected():
return await ctx.send('Im not connected to a voice channel!')
player = self.get_player(ctx)
player.queue._queue.clear()
await ctx.send('Cleared the queue.')
@commands.command(name='queue', aliases=['q', 'playlist'])
@commands.guild_only()
async def queue_info(self, ctx: Context):
"""Shows the queue."""
if not ctx.author.voice:
return await ctx.send("You are not connected to a voice channel.")
vc = ctx.guild.voice_client
if not vc or not vc.is_connected():
return await ctx.send('Im not connected to a voice channel!')
player = self.get_player(ctx)
if player.queue.empty():
return await ctx.send('There are currently no songs in the queue.')
# Grab up to 5 entries from the queue...
upcoming = list(itertools.islice(player.queue._queue, 0, 5))
fmt = '\n'.join(f'**`{_["title"]}`**' for _ in upcoming)
embed = disnake.Embed(title=f'Queue - Next {len(upcoming)}', description=fmt,
colour=disnake.Colour.green())
await ctx.send(embed=embed)
# await ctx.message.add_reaction('??')
@commands.command(name='now_playing', aliases=['np', 'current'])
@commands.guild_only()
async def now_playing(self, ctx: Context):
"""Shows the currently playing song."""
if not ctx.author.voice:
return await ctx.send("You are not connected to a voice channel.")
vc = ctx.guild.voice_client
if not vc or not vc.is_connected():
return await ctx.send('Im not connected to a voice channel!', )
player = self.get_player(ctx)
if not player.current:
return await ctx.send('Im not playing anything.')
try:
# Remove our previous now_playing message.
await player.np.delete()
except disnake.HTTPException:
pass
player.np = await ctx.send(f'**Playing:** `{vc.source.title}` '
f'requested from `{vc.source.requester}`')
@commands.command(name='volume', aliases=['vol'])
@commands.guild_only()
async def change_volume(self, ctx: Context, *, vol: float):
"""Changes the players volume. Choose from 1 to 100.
"""
if not ctx.author.voice:
return await ctx.send("You are not connected to a voice channel.")
vc = ctx.guild.voice_client
if not vc or not vc.is_connected():
return await ctx.send('Im not connected to a voice channel!')
if not 0 < vol < 101:
return await ctx.send('Just enter a number between 1 and 100.')
player = self.get_player(ctx)
if vc.source:
vc.source.volume = vol / 100
player.volume = vol / 100
await ctx.send(f'**`{ctx.author}`** Current volume: **{vol}%**')
# await ctx.message.add_reaction('??')
@commands.command(name='shuffle', aliases=['sff'], description="Shuffles the queue")
@commands.guild_only()
async def shuffe(self, ctx: Context):
"""Shuffle the queue randomly.
"""
if not ctx.author.voice:
return await ctx.send("You are not connected to a voice channel.")
vc = ctx.guild.voice_client
if not vc or not vc.is_connected():
return await ctx.send('Im not connected to a voice channel!')
player = self.get_player(ctx)
if player.queue.empty():
return await ctx.send('There are currently no songs in the queue.')
random.shuffle(player.queue._queue)
await ctx.send("Shuffled the queue.")
@commands.command(name='restart', description="Restarts the current track")
@commands.guild_only()
async def restart(self, ctx: Context):
"""Restart the song.
"""
if not ctx.author.voice:
return await ctx.send("You are not connected to a voice channel.")
vc = ctx.guild.voice_client
if not vc or not vc.is_connected():
return await ctx.send('Im not connected to a voice channel!')
if not vc.is_playing():
return await ctx.send("Im not playing anything.")
player = self.get_player(ctx)
source = await YTDLSource.create_source(ctx, vc.source.url, loop=self.bot.loop, download=False)
await player.queue.put(source)
await ctx.send("Restarted track.")
vc.stop()
@commands.command(name='loop')
@commands.guild_only()
async def loop(self, ctx: Context):
"""Looping the current song"""
if not ctx.author.voice:
return await ctx.send("You are not connected to a voice channel.")
vc = ctx.guild.voice_client
if not vc or not vc.is_connected():
return await ctx.send('Im not connected to a voice channel!')
player = self.get_player(ctx)
if not vc.is_playing():
return await ctx.send("Im not playing anything.")
if player.loop == False:
player.loop = True
else:
player.loop = False
return await ctx.send(f"Looping is now {('enabled :white_check_mark:' if player.loop else 'disabled')}")
@commands.command(name='stop', aliases=['leave'], description="Stops the bot instance")
@commands.guild_only()
async def stop(self, ctx: Context):
"""Stops the bot instance."""
if not ctx.author.voice:
return await ctx.send("You are not connected to a voice channel.")
vc = ctx.guild.voice_client
if not vc or not vc.is_connected():
return await ctx.send('Im not connected to a voice channel!')
if vc.is_playing():
vc.stop
await self.cleanup(ctx.guild, ctx.channel, left_channel=False)
await ctx.send("Left the voice channel.👋")
bot = commands.Bot('!', description='Yet another music bot.')
bot.add_cog(Music(bot))
@bot.event
async def on_ready():
print('Logged in as:\n{0.user.name}\n{0.user.id}'.format(bot))
bot.run('Token')
@535i
Copy link
Author

535i commented Feb 12, 2024

If you encounter any problems, or wanna request a new feature let me know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment