Skip to content

Instantly share code, notes, and snippets.

Created Apr 15, 2019

What would you like to do?
A music example by master boss
import discord
from discord.ext import commands
import asyncio
import itertools, datetime
import sys
import traceback
from async_timeout import timeout
from functools import partial
from youtube_dl import YoutubeDL
from discord.ext.commands.cooldowns import BucketType
ytdlopts = {
'format': 'bestaudio/best',
'outtmpl': 'downloads/%(extractor)s-%(id)s-%(title)s.%(ext)s',
'restrictfilenames': True,
'noplaylist': True,
'nocheckcertificate': True,
'ignoreerrors': False,
'logtostderr': False,
'quiet': True,
'no_warnings': True,
'default_search': 'auto',
'source_address': '' # ipv6 addresses cause issues sometimes
ffmpegopts = {
'before_options': '-nostdin -preset ultrafast',
'options': '-vn -threads 1'
ytdl = YoutubeDL(ytdlopts)
if not discord.opus.is_loaded():
class VoiceConnectionError(commands.CommandError):
"""Custom Exception class for connection errors."""
class InvalidVoiceChannel(VoiceConnectionError):
"""Exception for cases of invalid Voice Channels."""
class YTDLSource(discord.PCMVolumeTransformer):
def __init__(self, source, *, data, requester):
self.requester = requester
self.title = data.get('title')
if self.title is None:
self.title = "No title available"
self.web_url = data.get('webpage_url')
self.thumbnail = data.get('thumbnail')
if self.thumbnail is None:
self.thumbnail = ""
self.duration = data.get('duration')
if self.duration is None:
self.duration = 0
self.uploader = data.get('uploader')
if self.uploader is None:
self.uploader = "Unknown uploader"
# YTDL info dicts (data) have other useful information you might want
def __getitem__(self, item: str):
"""Allows us to access attributes similar to a dict.
This is only useful when you are NOT downloading.
return self.__getattribute__(item)
async def create_source(cls, ctx, search: str, *, loop, download=False):
loop = loop or asyncio.get_event_loop()
to_run = partial(ytdl.extract_info, url=search, download=download)
data = await loop.run_in_executor(None, to_run)
if 'entries' in data:
# take first item from a playlist
data = data['entries'][0]
await ctx.send(f':notes: Added to queue: **{data["title"]}**')
if download:
source = ytdl.prepare_filename(data)
return {'webpage_url': data['webpage_url'], 'requester':, 'title': data['title']}
return cls(discord.FFmpegPCMAudio(source), data=data,
async def regather_stream(cls, data, *, loop):
"""Used for preparing a stream, instead of downloading.
Since Youtube Streaming links expire."""
loop = loop or asyncio.get_event_loop()
requester = data['requester']
to_run = partial(ytdl.extract_info, url=data['webpage_url'], download=False)
data = await loop.run_in_executor(None, to_run)
return cls(discord.FFmpegPCMAudio(data['url']), data=data, requester=requester)
class MusicPlayer:
"""A class which is assigned to each guild using the bot for Music.
This class implements a queue and loop, which allows for different guilds to listen to different playlists
When the bot disconnects from the Voice it's instance will be destroyed.
__slots__ = ('bot', '_guild', '_ctxs', '_channel', '_cog', 'queue', 'next', 'current', 'np', 'volume', 'buttons', 'music', 'music_controller', 'restmode')
def __init__(self, ctx):
self.buttons = {'': 'rp',
'': 'skip',
'': 'vol_up',
'': 'vol_down',
'🖼': 'thumbnail',
'': 'stop',
'': 'queue',
'': 'tutorial'} =
self._guild = ctx.guild
self._ctxs = ctx
self._channel =
self._cog = ctx.cog
self.queue = asyncio.Queue() = asyncio.Event() = None
self.volume = .5
self.current = None
self.music_controller = None
async def buttons_controller(self, guild, current, source, channel, context):
vc = guild.voice_client
vctwo = context.voice_client
for react in self.buttons:
await current.add_reaction(str(react))
def check(r, u):
if not current:
return False
elif str(r) not in self.buttons.keys():
return False
elif == or !=
return False
elif u not in
return False
return False
return True
while current:
if vc is None:
return False
react, user = await'reaction_add', check=check)
control = self.buttons.get(str(react))
if control == 'rp':
if vc.is_paused():
if control == 'skip':
if control == 'stop':
await channel.send('**:notes: Ok, goodbye!**', delete_after=5)
await self._cog.cleanup(guild)
if control == 'vol_up':
player = self._cog.get_player(context)
vctwo.source.volume += 5
if control == 'vol_down':
player = self._cog.get_player(context)
vctwo.source.volume -= 5
if control == 'thumbnail':
await channel.send(embed=discord.Embed(color=0x36393E).set_image(url=source.thumbnail).set_footer(text=f"Requested by {source.requester} | Video: {source.title}", icon_url=source.requester.avatar_url), delete_after=10)
if control == 'tutorial':
await channel.send(embed=discord.Embed(color=0x36393E).add_field(name="How to use Music Controller?", value="⏯ - Resume or pause player\n⏭ - Skip song\n➕ - Volume up\n➖ - Volume down\n🖼 - Get song thumbnail\n⏹ - Stop music session\nℹ - Player queue\n❔ - Shows you how to use Music Controller"), delete_after=10)
if control == 'queue':
await self._cog.queue_info(context)
await current.remove_reaction(react, user)
except discord.HTTPException:
async def player_loop(self):
"""Our main player loop."""
while not
async with timeout(3500):
source = await self.queue.get()
except asyncio.TimeoutError:
return self.destroy(self._guild)
if not isinstance(source, YTDLSource):
# Source was probably a stream (not downloaded)
# So we should regather to prevent stream expiration
source = await YTDLSource.regather_stream(source,
except Exception as e:
await self._channel.send(f':notes: There was an error processing your song.\n'
source.volume = self.volume
self.current = source
try:, after=lambda _:
except Exception:
embednps = discord.Embed(color=0x36393E)
embednps.add_field(name="Song title:", value=f"```fix\n{source.title}```", inline=False)
embednps.add_field(name="Requested by:", value=f"**{source.requester}**", inline=True)
embednps.add_field(name="Song URL:", value=f"**[URL]({source.web_url})**", inline=True)
embednps.add_field(name="Uploader:", value=f"**{source.uploader}**", inline=True)
embednps.add_field(name="Song duration:", value=f"**{datetime.timedelta(seconds=source.duration)}**", inline=True)
embednps.set_thumbnail(url=f"{source.thumbnail}") = await self._channel.send(embed=embednps)
self.music_controller =,, source, self._channel, self._ctxs))
# Make sure the FFmpeg process is cleaned up.
self.current = None
# We are no longer playing this song...
except Exception:
def destroy(self, guild):
"""Disconnect and cleanup the player."""
class Music(commands.Cog):
"""Music related commands."""
__slots__ = ('bot', 'players', 'musictwo', 'music_controller')
def __init__(self, bot): = bot
self.players = {}
async def cleanup(self, guild):
await guild.voice_client.disconnect()
except AttributeError:
del self.players[]
except KeyError:
async def __local_check(self, ctx):
"""A local check which applies to all commands in this cog."""
if not ctx.guild:
raise commands.NoPrivateMessage
return True
async def cleanup(self, guild):
await guild.voice_client.disconnect()
except AttributeError:
del self.players[]
except KeyError:
async def __error(self, ctx, error):
"""A local error handler for all errors arising from commands in this cog."""
if isinstance(error, commands.NoPrivateMessage):
return await ctx.send(':notes: This command can not be used in Private Messages.')
except discord.HTTPException:
elif isinstance(error, InvalidVoiceChannel):
await ctx.send(":notes: Please join voice channel or specify one with command!")
print('Ignoring exception in command {}:'.format(ctx.command), file=sys.stderr)
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
def get_player(self, ctx):
"""Retrieve the guild player, or generate one."""
player = self.players[]
except KeyError:
player = MusicPlayer(ctx)
self.players[] = player
return player
@commands.command(name='connect', aliases=['join', 'j'])
async def connect_(self, ctx, *, channel: discord.VoiceChannel=None):
"""Connect to voice.
channel: discord.VoiceChannel [Optional]
The channel to connect to. If a channel is not specified, an attempt to join the voice channel you are in
will be made.
This command also handles moving the bot to different channels.
if not channel:
channel =
except AttributeError:
await ctx.send(":notes: Please join voice channel or specify one with command!")
vc = ctx.voice_client
if vc:
if ==
await vc.move_to(channel)
except asyncio.TimeoutError:
raise VoiceConnectionError(f'Moving to channel: <{channel}> timed out.')
await channel.connect()
except asyncio.TimeoutError:
raise VoiceConnectionError(f'Connecting to channel: <{channel}> timed out.')
await ctx.send(f":notes: Connected to channel: **{channel}**", delete_after=20)
@commands.command(name='play', aliases=['sing', 'p'])
async def play_(self, ctx, *, search: str):
"""Request a song and add it to the queue.
This command attempts to join a valid voice channel if the bot is not already in one.
Uses YTDL to automatically search and retrieve a song.
search: str [Required]
The song to search and retrieve using YTDL. This could be a simple search, an ID or URL.
await ctx.trigger_typing()
vc = ctx.voice_client
if not vc:
await ctx.invoke(self.connect_)
elif not in
return await ctx.send(":notes: Please join my voice channel to execute this command.", delete_after=20)
player = self.get_player(ctx)
# If download is False, source will be a dict which will be used later to regather the stream.
# If download is True, source will be a discord.FFmpegPCMAudio with a VolumeTransformer.
source = await YTDLSource.create_source(ctx, search,, download=False)
await player.queue.put(source)
@commands.command(name='now_playing', aliases=['np', 'current', 'currentsong', 'playing'])
async def now_playing_(self, ctx):
"""Display information about the currently playing song."""
vc = ctx.voice_client
if not vc or not vc.is_connected():
return await ctx.send(":notes: I am not connected to voice or playing anything. Join or specify one with command join.", delete_after=20)
elif not in
return await ctx.send(":notes: Please join my voice channel to execute this command.", delete_after=20)
player = self.get_player(ctx)
if not player.current:
return await ctx.send(":notes: I am not connected to voice or playing anything. Join or specify one with command join.", delete_after=20)
# Remove our previous now_playing message.
except discord.HTTPException:
embednp = discord.Embed(color=0x36393E)
embednp.add_field(name="Song title:", value=f"```fix\n{vc.source.title}```", inline=False)
embednp.add_field(name="Requested by:", value=f"**{vc.source.requester}**", inline=True)
embednp.add_field(name="Song URL:", value=f"**[URL]({vc.source.web_url})**", inline=True)
embednp.add_field(name="Uploader:", value=f"**{vc.source.uploader}**", inline=True)
embednp.add_field(name="Song duration:", value=f"**{datetime.timedelta(seconds=vc.source.duration)}**", inline=True)
embednp.set_thumbnail(url=f"{vc.source.thumbnail}") = await ctx.send(embed=embednp)
self.music_controller =,, vc.source,, ctx))
async def queue_info(self, ctx):
player = self.get_player(ctx)
if player.queue.empty():
return await ctx.send('**:notes: There are currently no more queued songs.**')
upcoming = list(itertools.islice(player.queue._queue, 0, 5))
fmt = '\n'.join(f'**`{_["title"]}`**' for _ in upcoming)
embed = discord.Embed(title=f'Queue - Next {len(upcoming)}', description=fmt, color=0x36393E)
await ctx.send(embed=embed)
def setup(bot):
print('Music module loaded.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.