Skip to content

Instantly share code, notes, and snippets.

@1oonie
Last active December 4, 2021 17:09
Show Gist options
  • Save 1oonie/4bc683e3cd58c60048a2a48fcd169642 to your computer and use it in GitHub Desktop.
Save 1oonie/4bc683e3cd58c60048a2a48fcd169642 to your computer and use it in GitHub Desktop.
music

music bot

Instuctions

Download both the python files below and put them in your project's root directory, (you may have to rename them because I have prefixed them with numbers so they appear in the order that I want).

Download slash_util from here and put that in your project's root directory.

Then you can install the requirements which are:

  • youtube_dl
  • PyNaCl

(both available using pip)

You should also replace YOUR_GUILD_ID with the guild id that you want to use your bot in and obviously put your token in the TOKEN variable.

To run it just run python3 main.py and it should come print that it is online after a bit (it has to register all the slash commands!)

import discord # type: ignore
import slash_util
from player import PlayerMixin, Player, Song
from io import BytesIO
import re
import asyncio
GUILD_ID = 123
TOKEN = "your.bot.token"
class _Context(slash_util.Context):
@property
def player(self):
return self.bot.get_player(self.interaction.guild)
setattr(slash_util, "Context", _Context)
youtube_regex = re.compile(
r"https:\/\/(?:www\.)?youtu(?:be\.com\/watch\?v=.+$|\.be\/.+$)"
)
def progress_bar(percent: float) -> str:
percent = int(percent - (percent % 5)) // 5
if percent == 0:
percent = 1
bar = ("―" * (percent - 1)) + "🔘" + ("―" * (20 - percent))
return bar
def to_duration(duration: int) -> str:
hours, minutes = divmod(duration, 3600)
minutes, seconds = divmod(minutes, 60)
if seconds < 10:
seconds = f'0{seconds}'
def maybe(n: int) -> str:
if n == 0:
return ""
else:
return f"{n}:"
return f"{maybe(hours)}{maybe(minutes)}{seconds}"
class MusicCog(slash_util.ApplicationCog):
@slash_util.slash_command(guild_id=GUILD_ID, name="play", description="Plays a song")
@slash_util.describe(url="The YouTube url", volume="The volume that the song should be (%)")
async def _play(self, ctx, url: str, volume: int = 100):
interaction = ctx.interaction
await interaction.response.defer(ephemeral=True)
message = await interaction.original_message()
if not re.match(youtube_regex, url):
await message.edit(content="That is not a valid YouTube URL.")
return
if ctx.player is None:
player = await Player.create(ctx)
else:
if ctx.player.voice_client.channel.id != interaction.user.voice.channel.id: # type: ignore
ctx.player.voice_client.stop()
await ctx.player.voice_client.move_to(interaction.user.voice.channel) # type: ignore
player = ctx.player
if player is None:
return
player.update_interaction(ctx)
song = await Song.from_url(player, url, volume/100) # type: ignore
try:
player.queue.put_nowait(song)
await message.edit(content=f"🎵 Added `{song.title}` to the queue.")
except asyncio.QueueFull:
await message.edit(content="The queue is full!")
@slash_util.slash_command(guild_id=GUILD_ID, name="current", description="See what song is currently playing")
async def _current(self, ctx):
if ctx.player is None or ctx.player.current is None:
content = "I am not playing anything!"
else:
progress = progress_bar(
(ctx.player.current.seconds_played / ctx.player.current.duration) * 100
)
content = f"🎵 Currently playing `{ctx.player.current.title}`.\n`{to_duration(ctx.player.current.seconds_played)}` {progress} `{to_duration(ctx.player.current.duration)}`"
await ctx.interaction.response.send_message(content=content, ephemeral=True)
ctx.player.update_interaction(ctx)
@slash_util.slash_command(guild_id=GUILD_ID, name="volume", description="View or change the volume for the current song")
@slash_util.describe(volume="The volume you want to set it to (%)")
async def _volume(self, ctx, volume: int = None):
if ctx.player is None or ctx.player.current is None:
content = "I am not playing anything!"
elif volume is None:
content = f"🔊 Currently the volume is `{int(ctx.player.current.source.volume*100)}%`"
else:
ctx.player.set_volume(volume/100) # type: ignore
content = f"🔊 I have set the volume to `{int(ctx.player.current.source.volume*100)}%`"
await ctx.interaction.response.send_message(content=content, ephemeral=True)
ctx.player.update_interaction(ctx)
@slash_util.slash_command(guild_id=GUILD_ID, name="pause", description="Pauses the current song.")
async def _pause(self, ctx):
if ctx.player is None or ctx.player.current is None:
content = "I am not playing anything!"
elif ctx.player.voice_client.is_paused():
content = "The current song is already paused!"
else:
ctx.player.voice_client.pause()
content = f"⏸️ Paused `{ctx.player.current.title}`"
await ctx.interaction.response.send_message(content=content, ephemeral=True)
ctx.player.update_interaction(ctx)
@slash_util.slash_command(guild_id=GUILD_ID, name="resume", description="Resumes the current song.")
async def _resume(self, ctx):
if ctx.player is None or ctx.player.current is None:
content = "I am not playing anything!"
elif not ctx.player.voice_client.is_paused():
content = "The current song is not paused!"
else:
ctx.player.voice_client.resume()
content = f"⏸️ resumed `{ctx.player.current.title}`"
await ctx.interaction.response.send_message(content=content, ephemeral=True)
ctx.player.update_interaction(ctx)
@slash_util.slash_command(guild_id=GUILD_ID, name="skip", description="Skips the current song.")
async def _skip(self, ctx):
if ctx.player is None or ctx.player.current is None:
content = "I am not playing anything!"
else:
content = f"⏭️ Skipped `{ctx.player.current.title}`."
ctx.player.voice_client.stop()
await ctx.interaction.response.send_message(content=content, ephemeral=True)
ctx.player.update_interaction(ctx)
@slash_util.slash_command(guild_id=GUILD_ID, name="queue", description="Look at your queue")
async def _queue(self, ctx):
if ctx.player is None:
content = "I am not connected to a voice channel."
elif ctx.player.queue.qsize() == 0:
content = "The queue is empty!"
else:
content = "📋 Here is your queue:\n" + "\n".join(
f"{n}. `{song.title}`" for n, song in enumerate(ctx.player.get_queue(), start=1)
)
await ctx.interaction.response.send_message(content=content, ephemeral=True)
ctx.player.update_interaction(ctx)
@slash_util.slash_command(guild_id=GUILD_ID, name="remove", description="Removes an item from the queue by its index")
@slash_util.describe(index="The index of the song you want to remove.")
async def _remove(self, ctx, index: int):
if ctx.player is None:
content = "I am not connected to a voice channel."
elif ctx.player.queue.qsize() == 0:
content = "The queue is empty!"
else:
try:
item = ctx.player.remove_item(index)
except IndexError:
content = "That index does not exist."
else:
content = f"❌ Removed `{item.title}`."
await ctx.interaction.response.send_message(content=content, ephemeral=True)
ctx.player.update_interaction(ctx)
@slash_util.slash_command(guild_id=GUILD_ID, name="stop", description="Stops the player.")
async def _stop(self, ctx):
if ctx.player is None or ctx.player.current is None:
await ctx.interaction.response.send_message(content="I am not playing anything!", ephemeral=True)
return
await ctx.interaction.response.send_message(content="Stopping the player", ephemeral=True)
ctx.player.update_interaction(ctx)
await ctx.player.destroy("invokation of `stop`.")
class Bot(slash_util.Bot, PlayerMixin):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
PlayerMixin.__init__(self)
self.add_cog(MusicCog(self))
async def start(self, token: str, *, reconnect: bool = True) -> None:
await self.login(token)
app_info = await self.application_info()
self._connection.application_id = app_info.id
await self.delete_all_commands(guild_id=GUILD_ID)
await self.sync_commands()
await self.connect(reconnect=reconnect)
async def on_ready(self):
print(f"logged in as {self.user!s}")
async def close(self):
await PlayerMixin.close(self)
return await super().close()
bot = Bot(command_prefix="!")
bot.run(TOKEN)
import discord # type: ignore
import asyncio
import youtube_dl
ytdl = youtube_dl.YoutubeDL(
{
"format": "bestaudio/best",
"outtmpl": "%(extractor)s-%(id)s-%(title)s.%(ext)s",
"restrictfilenames": True,
"noplaylist": True,
"nocheckcertificate": True,
"ignoreerrors": False,
"logtostderr": False,
"quiet": True,
"no_warnings": True,
"default_search": "auto",
"source_address": "0.0.0.0",
}
)
ffmpeg_options = {
"before_options": "-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 5 -nostdin",
"options": "-vn",
}
class Song:
def __init__(
self,
player,
source,
title,
duration,
) -> None:
self.player = player
self.source = source
self.title = title
self.duration = duration
self.seconds_played = 0
@classmethod
async def from_url(cls, player, url, volume=1.0):
data = await player.loop.run_in_executor(
None, lambda: ytdl.extract_info(url, download=False)
)
filename = data["url"]
source = discord.PCMVolumeTransformer(
discord.FFmpegPCMAudio(filename, **ffmpeg_options), volume=volume
)
return cls(player, source, data["title"], data["duration"])
async def player_loop(player) -> None:
while True:
player.current = None
try:
song = await asyncio.wait_for(player.queue.get(), timeout=300)
except asyncio.TimeoutError:
await player.destroy("inactivity")
break
player.current = song
await player.interaction.followup.send(f"⏯️ Playing `{song.title}`.", ephemeral=True)
player.voice_client.play(song.source)
# i feel like this is a bit jank
while True:
if (
not player.voice_client.is_playing()
and not player.voice_client.is_paused()
):
break
else:
if not player.voice_client.is_paused():
player.current.seconds_played += 1
await asyncio.sleep(1)
class Player:
def __init__(self, voice_client, context):
self.voice_client = voice_client
self.loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
self.context = context
self.interaction = context.interaction
self.player_loop = self.loop.create_task(player_loop(self))
self.queue = asyncio.Queue(maxsize=25)
self.current = None
@classmethod
async def create(cls, context):
interaction = context.interaction
if isinstance(interaction.user, discord.User):
return
if interaction.user.voice is None or interaction.user.voice.channel is None:
return
if interaction.guild is None:
return
voice_client: discord.VoiceClient = await interaction.user.voice.channel.connect()
player = cls(voice_client, context)
context.bot.add_player(interaction.guild, player)
return player
async def destroy(self, reason) -> None:
await self.interaction.followup.send(f"😵‍💫 Destroying the player due to {reason}", ephemeral=True)
self.voice_client.stop()
await self.voice_client.disconnect()
self.context.bot.remove_player(self.context.guild) # type: ignore
self.player_loop.cancel()
def update_interaction(self, context):
self.context = context
self.interaction = context.interaction
def set_volume(self, volume: float) -> None:
if self.voice_client.source is None:
return
self.voice_client.source.volume = volume # type: ignore
def get_queue(self):
return list(self.queue._queue) # type: ignore
def remove_item(self, index):
item = list(self.queue._queue)[index - 1] # type: ignore
self.queue._queue.remove(item) # type: ignore
self.queue._wakeup_next(self.queue._putters) # type: ignore
return item
def add_item(self, song):
self.queue.put_nowait(song)
class PlayerMixin:
def __init__(self) -> None:
self._players = {}
def get_player(self, guild):
return self._players.get(guild.id)
def add_player(self, guild, player):
if not guild.id in self._players:
self._players[guild.id] = player
return player
def remove_player(self, guild):
if guild.id in self._players:
del self._players[guild.id]
async def close(self):
players = list(self._players.values())
for player in players:
await player.destroy("the bot closing.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment