Skip to content

Instantly share code, notes, and snippets.

@munishkhatri720
Forked from cloudwithax/README.md
Created May 15, 2023 04:47
Show Gist options
  • Save munishkhatri720/687125ecaefbf273cd7dfb837c4fe008 to your computer and use it in GitHub Desktop.
Save munishkhatri720/687125ecaefbf273cd7dfb837c4fe008 to your computer and use it in GitHub Desktop.
Spotify Music Cog for Wavelink

Spotify Music Cog for Wavelink

This is a drop-in Wavelink music cog with Spotify support. Queue up tracks, albums, and playlists with the power of Python.

This requires that you have spotify.py installed from this repo

Feel free to add/edit any other commands and features, like pausing, skipping, queue management and more!

If you would like to revise anything here, feel free to comment/pull request.

import discord
import wavelink
import spotify
import re
import asyncio
from discord.ext import commands
from contextlib import suppress
URL_REG = re.compile(r'https?://(?:www\.)?.+')
SPOTIFY_URL_REG = re.compile(r'https?://open.spotify.com/(?P<type>album|playlist|track)/(?P<id>[a-zA-Z0-9]+)')
spotify_client = spotify.Client("id", "secret")
spotify_http_client = spotify.HTTPClient("id", "secret")
class Player(wavelink.Player):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue = asyncio.Queue()
self.context: commands.Context = kwargs.get('context', None)
self.now_playing = None
async def do_next(self):
if self.now_playing:
with suppress(discord.HTTPException):
await self.now_playing.delete()
try:
track = self.queue.get_nowait()
if track.id == "spotify":
spotify_track = await self.node.get_tracks(f"ytsearch:{track.title} {track.author} audio")
except asyncio.queues.QueueEmpty:
return await self.teardown()
if track.id == "spotify":
await self.play(spotify_track[0])
else:
await self.play(track)
self.now_playing = await self.context.send(f'Now playing **{self.current.title}**')
class Music(commands.Cog, wavelink.WavelinkMixin):
def __init__(self, bot):
self.bot = bot
if not hasattr(bot, 'wavelink'):
bot.wavelink = wavelink.Client(bot=bot)
bot.loop.create_task(self.start_nodes())
async def start_nodes(self) -> None:
await self.bot.wait_until_ready()
if self.bot.wavelink.nodes:
previous = self.bot.wavelink.nodes.copy()
for node in previous.values():
await node.destroy()
nodes = {'MAIN': {'host': '127.0.0.1',
'port': 2333,
'rest_uri': 'http://127.0.0.1:2333',
'password': 'youshallnotpass',
'identifier': 'MAIN',
'region': 'us_central'
}}
for n in nodes.values():
await self.bot.wavelink.initiate_node(**n)
@wavelink.WavelinkMixin.listener()
async def on_node_ready(self, node: wavelink.Node):
print(f'Node {node.identifier} is ready!')
@wavelink.WavelinkMixin.listener('on_track_stuck')
@wavelink.WavelinkMixin.listener('on_track_end')
@wavelink.WavelinkMixin.listener('on_track_exception')
async def on_player_stop(self, node: wavelink.Node, payload):
await payload.player.do_next()
@commands.command()
async def connect(self, ctx):
player: Player = self.bot.wavelink.get_player(guild_id=ctx.guild.id, cls=Player, context=ctx)
channel = getattr(ctx.author.voice, 'channel')
if channel is None:
raise commands.CommandError('You must be in a channel in order to use this command!')
await player.connect(channel.id)
@commands.command()
async def play(self, ctx, *, query: str):
player: Player = self.bot.wavelink.get_player(guild_id=ctx.guild.id, cls=Player, context=ctx)
if not player.is_connected:
await ctx.invoke(self.connect)
query = query.strip('<>')
if SPOTIFY_URL_REG.match(query):
spoturl_check = SPOTIFY_URL_REG.match(query)
search_type = spoturl_check.group('type')
spotify_id = spoturl_check.group('id')
if search_type == "playlist":
results = spotify.Playlist(client=spotify_client, data=await spotify_http_client.get_playlist(spotify_id))
try:
search_tracks = await results.get_all_tracks()
except:
raise commands.CommandError("I was not able to find this playlist! Please try again or use a different link.")
# Last result is not a playlist, so check if its a album and queue accordingly
elif search_type == "album":
results = await spotify_client.get_album(spotify_id=spotify_id)
try:
search_tracks = await results.get_all_tracks()
except:
raise commands.CommandError("I was not able to find this album! Please try again or use a different link.")
# Last result was not a album or a playlist, queue up track accordingly
elif search_type == 'track':
results = await spotify_client.get_track(spotify_id=spotify_id)
search_tracks = [results]
# This part is very important, this is the "fake track" that we'll look for when we queue up the track to play
tracks = [
wavelink.Track(
id_= 'spotify',
info={'title': track.name or 'Unknown', 'author': ', '.join(artist.name for artist in track.artists) or 'Unknown',
'length': track.duration or 0, 'identifier': track.id or 'Unknown', 'uri': track.url or 'spotify',
'isStream': False, 'isSeekable': False, 'position': 0, 'thumbnail': track.images[0].url if track.images else None},
) for track in search_tracks
]
if not tracks:
raise commands.CommandError("The URL you put is either not valid or doesn't exist!")
if search_type == "playlist":
for track in tracks:
player.queue.put_nowait(track)
await ctx.send(f"Queued **{len(tracks)}** tracks")
elif search_type == "album":
for track in tracks:
player.queue.put_nowait(track)
await ctx.send(f"Queued **{len(tracks)}** tracks")
else:
if player.is_playing:
await ctx.send(f"Queued **{len(tracks)}** tracks")
player.queue.put_nowait(tracks[0])
else:
if not URL_REG.match(query):
query = f'ytsearch:{query}'
tracks = await self.bot.wavelink.get_tracks(query)
if not tracks:
return await ctx.send('No songs were found with that query. Please try again.', delete_after=15)
if isinstance(tracks, wavelink.TrackPlaylist):
for track in tracks.tracks:
await player.queue.put(track)
await ctx.send(f'Queued **{len(tracks.tracks)}** songs', delete_after=15)
else:
if player.is_playing:
await ctx.send(f'Added {tracks[0].title} to the Queue', delete_after=15)
await player.queue.put(tracks[0])
if not player.is_playing:
await player.do_next()
def setup(bot: commands.Bot):
bot.add_cog(Music(bot))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment