-
-
Save aliencaocao/83690711ef4b6cec600f9a0d81f710e5 to your computer and use it in GitHub Desktop.
import os, sys | |
import re | |
import time | |
import datetime | |
import atexit | |
import random | |
from typing import Dict, List, Any | |
import logging | |
import asyncio | |
import requests | |
import urllib.parse, urllib.request | |
import nextcord | |
from nextcord.ext import commands, tasks, application_checks | |
import yt_dlp | |
sys.path.append('.') | |
logging.basicConfig(level=logging.WARNING) | |
yt_dlp.utils.bug_reports_message = lambda: '' # disable yt_dlp bug report | |
intents = nextcord.Intents.default() | |
# noinspection PyDunderSlots | |
intents.message_content = True | |
help_command = commands.DefaultHelpCommand(no_category='Commands') # Change only the no_category default string | |
bot = commands.Bot(intents=intents, help_command=help_command, description='', activity=nextcord.Game(name='Listening to music'), status=nextcord.Status.online) | |
ytdl_format_options: dict[str, Any] = {'format': 'bestaudio', | |
'outtmpl': '%(extractor)s-%(id)s-%(title)s.%(ext)s', | |
'restrictfilenames': True, | |
'no-playlist': True, | |
'nocheckcertificate': True, | |
'ignoreerrors': False, | |
'logtostderr': False, | |
'geo-bypass': True, | |
'quiet': True, | |
'no_warnings': True, | |
'default_search': 'auto', | |
'source_address': '0.0.0.0', | |
'no_color': True, | |
'overwrites': True, | |
'age_limit': 100, | |
'live_from_start': True} | |
ffmpeg_options = {'options': '-vn -sn'} | |
ytdl = yt_dlp.YoutubeDL(ytdl_format_options) | |
class Source: | |
"""Parent class of all music sources""" | |
def __init__(self, audio_source: nextcord.AudioSource, metadata): | |
self.audio_source: nextcord.AudioSource = audio_source | |
self.metadata = metadata | |
self.title: str = metadata.get('title', 'Unknown title') | |
self.url: str = metadata.get('url', 'Unknown URL') | |
def __str__(self): | |
return f'{self.title} ({self.url})' | |
class YTDLSource(Source): | |
"""Subclass of YouTube sources""" | |
def __init__(self, audio_source: nextcord.AudioSource, metadata): | |
super().__init__(audio_source, metadata) | |
self.url: str = metadata.get('webpage_url', 'Unknown URL') # yt-dlp specific key name for original URL | |
@classmethod | |
async def from_url(cls, url, *, loop=None, stream=True): | |
loop = loop or asyncio.get_event_loop() | |
metadata = await loop.run_in_executor(None, lambda: ytdl.extract_info(url, download=not stream)) | |
if 'entries' in metadata: metadata = metadata['entries'][0] | |
filename = metadata['url'] if stream else ytdl.prepare_filename(metadata) | |
return cls(await nextcord.FFmpegOpusAudio.from_probe(filename, **ffmpeg_options), metadata) | |
class ServerSession: | |
def __init__(self, guild_id, voice_client): | |
self.guild_id: int = guild_id | |
self.voice_client: nextcord.VoiceClient = voice_client | |
self.queue: List[Source] = [] | |
def display_queue(self) -> str: | |
currently_playing = f'Currently playing: 0. {self.queue[0]}' | |
return currently_playing + '\n' + '\n'.join([f'{i + 1}. {s}' for i, s in enumerate(self.queue[1:])]) | |
async def add_to_queue(self, ctx, url): # does not auto start playing the playlist | |
yt_source = await YTDLSource.from_url(url, loop=bot.loop, stream=False) # stream=True has issues and cannot use Opus probing | |
self.queue.append(yt_source) | |
if self.voice_client.is_playing(): | |
await ctx.send(f'Added to queue: {yt_source.title}') | |
async def start_playing(self, ctx): | |
self.voice_client.play(self.queue[0].audio_source, after=lambda e=None: self.after_playing(ctx, e)) | |
await ctx.send(f'Now playing: {self.queue[0].title}') | |
async def after_playing(self, ctx, error): | |
if error: | |
raise error | |
else: | |
if self.queue: | |
await self.play_next(ctx) | |
async def play_next(self, ctx): # should be called only after making the first element of the queue the song to play | |
self.queue.pop(0) | |
if self.queue: | |
await self.voice_client.play(self.queue[0].audio_source, after=lambda e=None: self.after_playing(ctx, e)) | |
await ctx.send(f'Now playing: {self.queue[0].title}') | |
server_sessions: Dict[int, ServerSession] = {} # {guild_id: ServerSession} | |
def clean_cache_files(): | |
if not server_sessions: # only clean if no servers are connected | |
for file in os.listdir(): | |
if os.path.splitext(file)[1] in ['.webm', '.mp4', '.m4a', '.mp3', '.ogg'] and time.time() - os.path.getmtime(file) > 7200: # remove all cached webm files older than 2 hours | |
os.remove(file) | |
def get_res_path(relative_path): | |
""" Get absolute path to resource, works for dev and for PyInstaller | |
Relative path will always get extracted into root!""" | |
base_path = getattr(sys, '_MEIPASS', os.path.dirname(__file__)) | |
if os.path.isfile(os.path.join(base_path, relative_path)): | |
return os.path.join(base_path, relative_path) | |
else: | |
raise FileNotFoundError(f'Embedded file {os.path.join(base_path, relative_path)} is not found!') | |
@atexit.register | |
def cleanup(): | |
global server_sessions | |
for vc in server_sessions.values(): | |
vc.disconnect() | |
vc.cleanup() | |
server_sessions = {} | |
clean_cache_files() | |
@bot.event | |
async def on_ready(): | |
print(f'Logged in as {bot.user}') | |
@bot.event | |
async def on_application_command_error(ctx, error): | |
await ctx.send(f'{ctx.user}\'s message "{ctx.message.content}" triggered error:\n{error}') | |
@bot.slash_command() | |
@application_checks.is_owner() | |
async def debug(ctx, code: str = nextcord.SlashOption(name='code', description='Code to execute', required=True)): | |
"""Only the bot owner can run this, executes arbitrary code""" | |
await ctx.send(eval(code)) | |
async def connect_to_voice_channel(ctx, channel): | |
voice_client = await channel.connect() | |
if voice_client.is_connected(): | |
server_sessions[ctx.guild.id] = ServerSession(ctx.guild.id, voice_client) | |
await ctx.send(f'Connected to {voice_client.channel.name}.') | |
return server_sessions[ctx.guild.id] | |
else: | |
await ctx.send(f'Failed to connect to voice channel {ctx.user.voice.channel.name}.') | |
@bot.slash_command(name='exit') | |
async def disconnect(ctx): | |
"""Disconnect from voice channel""" | |
guild_id = ctx.guild.id | |
if guild_id in server_sessions: | |
voice_client = server_sessions[guild_id].voice_client | |
await voice_client.disconnect() | |
voice_client.cleanup() | |
del server_sessions[guild_id] | |
await ctx.send(f'Disconnected from {voice_client.channel.name}.') | |
@bot.slash_command() | |
async def pause(ctx): | |
"""Pause the current song""" | |
guild_id = ctx.guild.id | |
if guild_id in server_sessions: | |
voice_client = server_sessions[guild_id].voice_client | |
if voice_client.is_playing(): | |
voice_client.pause() | |
await ctx.send('Paused') | |
@bot.slash_command() | |
async def resume(ctx): | |
"""Resume the current song""" | |
guild_id = ctx.guild.id | |
if guild_id in server_sessions: | |
voice_client = server_sessions[guild_id].voice_client | |
if voice_client.is_paused(): | |
voice_client.resume() | |
await ctx.send('Resumed') | |
@bot.slash_command() | |
async def skip(ctx): | |
"""Skip the current song""" | |
guild_id = ctx.guild.id | |
if guild_id in server_sessions: | |
session = server_sessions[guild_id] | |
voice_client = session.voice_client | |
if voice_client.is_playing(): | |
if len(session.queue) > 1: | |
voice_client.stop() # this will trigger after_playing callback and in that will call play_next so here no need call play_next | |
else: | |
await ctx.send('This is already the last item in the queue!') | |
@bot.slash_command(name='queue') | |
async def show_queue(ctx): | |
"""Show the current queue""" | |
guild_id = ctx.guild.id | |
if guild_id in server_sessions: | |
await ctx.send(f'{server_sessions[guild_id].display_queue()}') | |
@bot.slash_command() | |
async def remove(ctx, i: int = nextcord.SlashOption(name='index', description='Index of item to remove (current playing = 0 but only can remove from 1 onwards)', required=True)): | |
"""Remove an item from queue by index (1, 2...)""" | |
guild_id = ctx.guild.id | |
if guild_id in server_sessions: | |
if i == 0: | |
await ctx.send('Cannot remove current playing song, please use !skip instead.') | |
elif i >= len(server_sessions[guild_id].queue): | |
await ctx.send(f'The queue is not that long, there are only {len(server_sessions[guild_id].queue) - 1} items in the queue.') | |
else: | |
removed = server_sessions[guild_id].queue.pop(i) | |
removed.audio_source.cleanup() | |
await ctx.send(f'Removed {removed} from queue.') | |
@bot.slash_command() | |
async def clear(ctx): | |
"""Clear the queue and stop current song""" | |
guild_id = ctx.guild.id | |
if guild_id in server_sessions: | |
voice_client = server_sessions[guild_id].voice_client | |
server_sessions[guild_id].queue = [] | |
if voice_client.is_playing(): | |
voice_client.stop() | |
await ctx.send('Queue cleared.') | |
@bot.slash_command() | |
async def song(ctx): | |
"""Show the current song""" | |
guild_id = ctx.guild.id | |
if guild_id in server_sessions: | |
await ctx.send(f'Now playing {server_sessions[guild_id].queue[0]}') | |
@bot.slash_command() | |
async def play(ctx: nextcord.Interaction, query: str = nextcord.SlashOption(name='query', description='URL or search query', required=True)): | |
"""Play a YouTube video by URL if given a URL, or search up the song and play the first video in search result""" | |
guild_id = ctx.guild.id | |
if guild_id not in server_sessions: # not connected to any VC | |
if ctx.user.voice is None: | |
await ctx.send(f'You are not connected to any voice channel!') | |
return | |
else: | |
session = await connect_to_voice_channel(ctx, ctx.user.voice.channel) | |
else: # is connected to a VC | |
session = server_sessions[guild_id] | |
if session.voice_client.channel != ctx.user.voice.channel: # connected to a different VC than the command issuer (but within the same server) | |
await session.voice_client.move_to(ctx.user.voice.channel) | |
await ctx.send(f'Connected to {ctx.user.voice.channel}.') | |
try: | |
requests.get(query) | |
except (requests.ConnectionError, requests.exceptions.MissingSchema): # if not a valid URL, do search and play the first video in search result | |
query_string = urllib.parse.urlencode({"search_query": query}) | |
formatUrl = urllib.request.urlopen("https://www.youtube.com/results?" + query_string) | |
search_results = re.findall(r"watch\?v=(\S{11})", formatUrl.read().decode()) | |
url = f'https://www.youtube.com/watch?v={search_results[0]}' | |
else: # is a valid URL, play directly | |
url = query | |
await session.add_to_queue(ctx, url) # will download file here | |
if not session.voice_client.is_playing() and len(session.queue) <= 1: | |
await session.start_playing(ctx) | |
clean_cache_files() | |
bot.run('your token here') |
To run this, install Python 3.9+ | |
pip install nextcord requests yt-dlp | |
Download FFMPEG and yt-dlp binaries and place in somewhere in ur PATH, or place right beside this python script as the script appends its working directory into path | |
You can run it as a python script, or compile it into exe using Pyinstaller. |
Don't know what am doing wrong. I used the code as it is and the bot runs just fine but on discord the slash commands doesn't show up. Am new to programming so am confused on what's happening.
@krishna687 you may need to wait a while. Sometimes the slash commands appear after half an hour
hello, the slash commands are not working. What shall I do ?
Yes i am aware, trying to debug.
Yes i am aware, trying to debug.
it works now! looks like I had to kick the bot and reinvite it...But I have a new error now when I use the "play" cmd:
line 406, in on_application_command_error
await ctx.send(f'{ctx.user}'s message "{ctx.message.content}" triggered error:\n{error}')
AttributeError: 'NoneType' object has no attribute 'content'
yea this is what i was referring to as the known issue.
Got it! I let the play methods be sync and make an async call to send the messages:
from
await ctx.send(f'Now playing: {self.queue[0].title}')
toasyncio.run_coroutine_threadsafe(ctx.send('Now playing: ♪ {}'.format(self.current.title)), self.bot.loop)