Skip to content

Instantly share code, notes, and snippets.

@rach-sharp
Created January 6, 2020 14:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rach-sharp/39c6926c1e5efd75a2f9cf0474a946c6 to your computer and use it in GitHub Desktop.
Save rach-sharp/39c6926c1e5efd75a2f9cf0474a946c6 to your computer and use it in GitHub Desktop.
New and Improved Spotify Connection
import json
from datetime import datetime
from math import ceil
import base64
import pytz
import requests
import spotipy
from io import BytesIO
from oauthlib.oauth2 import InvalidGrantError
from requests import HTTPError
from sentry_sdk import capture_exception
from spotipy import SpotifyException
class SpotifyConnection(spotipy.Spotify):
"""Modified version of the spotify.Spotipy class
Main changes are:
-implementing additional API endpoints (currently_playing, recently_played)
-updating the main internal call method to update the session and retry once on error,
due to an issue experienced when performing actions which require an extended time
connected.
"""
def currently_playing(self):
"""Gets whatever the authenticated user is currently listening to"""
return self._get("me/player/currently-playing")
def recently_played(self, limit=50):
"""Gets the last 50 songs the user has played.
This doesn't include whatever the user is currently listening to, and no more than the
last 50 songs are available.
"""
return self._get("me/player/recently-played", limit=limit)
def genre_songs(self, genre_name, limit=50):
query = 'genre:"{0}"'.format(genre_name)
response = self.search(query, limit=50, offset=0, type="track")
results = response["tracks"]["items"]
# subsequently runs until it hits the user-defined limit or has read all songs in the library
while len(results) < limit:
response = self.search(query, limit=50, offset=len(results), type="track")
results.extend(response["tracks"]["items"])
return results[:limit]
def tracks_full(self, tracks, market=None):
results = []
for i in range(int(ceil(len(tracks) / 50))):
response = self.tracks(tracks[50 * i: 50 * (i + 1)], market)
results.extend(response["tracks"])
return results
def artists_full(self, artists):
results = []
for i in range(int(ceil(len(artists) / 50))):
response = self.artists(artists[50 * i: 50 * (i + 1)])
results.extend(response["artists"])
return results
@staticmethod
def parse_spotify_datetime(timestamp):
try:
dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ")
return dt.replace(tzinfo=pytz.UTC, microsecond=0)
def current_user_saved_tracks_full(self, until=None):
""" Gets a list of the tracks saved in the current authorized user's
"Your Music" library
Parameters:
- limit - the number of tracks to return
- offset - the index of the first track to return
"""
# first run through also retrieves total no of songs in library
response = self.current_user_saved_tracks(limit=50, offset=0)
results = response["items"]
# subsequently runs until it hits the user-defined limit or has read all songs in the library
while len(results) < response["total"] or (
until and (until and self.parse_spotify_datetime(response["items"][-1]["added_at"]) > until)
):
response = self.current_user_saved_tracks(limit=50, offset=len(results))
results.extend(response["items"])
if len(response["items"]) == 0:
break
if until and len(results) > 0:
while results and self.parse_spotify_datetime(results[-1]["added_at"]) <= until:
results.pop()
return results
def link_tracks(self, track_ids, country_code):
response = self.tracks_full(track_ids, country_code)
linked_tracks = []
reverse_track_link_mapping = {}
for track_rs in response:
linked_tracks.append(track_rs["id"])
if "linked_from" in track_rs:
reverse_track_link_mapping[track_rs["id"]] = track_rs["linked_from"]["id"]
return linked_tracks, reverse_track_link_mapping
@staticmethod
def unlink_tracks(track_ids, reverse_track_link_mapping):
return [reverse_track_link_mapping.get(t, t) for t in track_ids]
def user_playlist_tracks_full(self, user, playlist_id=None, fields=None, market=None):
""" Get full details of the tracks of a playlist owned by a user.
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- fields - which fields to return
- market - an ISO 3166-1 alpha-2 country code.
"""
# first run through also retrieves total no of songs in library
response = self.user_playlist_tracks(user, playlist_id, fields=fields, limit=100, market=market)
results = response["items"]
# subsequently runs until it hits the user-defined limit or has read all songs in the library
while len(results) < response["total"]:
response = self.user_playlist_tracks(
user, playlist_id, fields=fields, limit=100, offset=len(results), market=market
)
results.extend(response["items"])
return results
def playlist(self, playlist_id, fields=None):
''' Gets playlist of a user
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- fields - which fields to return
'''
plid = self._get_id('playlist', playlist_id)
return self._get("playlists/%s" % plid, fields=fields)
def playlist_tracks(self, playlist_id=None, fields=None,
limit=100, offset=0, market=None):
''' Get full details of the tracks of a playlist owned by a user.
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- fields - which fields to return
- limit - the maximum number of tracks to return
- offset - the index of the first track to return
- market - an ISO 3166-1 alpha-2 country code.
'''
plid = self._get_id('playlist', playlist_id)
return self._get("playlists/%s/tracks" % plid,
limit=limit, offset=offset, fields=fields,
market=market)
def playlist_tracks_full(self, playlist_id=None, fields=None, market=None):
""" Get full details of the tracks of a playlist owned by a user.
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- fields - which fields to return
- market - an ISO 3166-1 alpha-2 country code.
"""
# first run through also retrieves total no of songs in library
response = self.playlist_tracks(playlist_id, fields=fields, limit=100, market=market)
results = response["items"]
# subsequently runs until it hits the user-defined limit or has read all songs in the library
while len(results) < response["total"]:
response = self.playlist_tracks(
playlist_id, fields=fields, limit=100, offset=len(results), market=market
)
results.extend(response["items"])
return results
def user_playlists_full(self, user):
""" Get full details of the tracks of a playlist owned by a user.
Parameters:
- user - the id of the user
- playlist_id - the id of the playlist
- fields - which fields to return
- market - an ISO 3166-1 alpha-2 country code.
"""
# first run through also retrieves total no of songs in library
response = self.user_playlists(user, limit=50)
results = response["items"]
while len(results) < response["total"]:
response = self.user_playlists(
user, limit=50, offset=len(results)
)
results.extend(response["items"])
return results
def user_playlist_replace_tracks_full(self, user, playlist_id, tracks):
self.user_playlist_replace_tracks(user, playlist_id, tracks[0:50])
if len(tracks) > 50:
self.user_playlist_add_tracks_full(user, playlist_id, tracks[50:])
def user_playlist_add_tracks_full(self, user, playlist_id, tracks):
for i in range(int(ceil(len(tracks) / 50))):
self.user_playlist_add_tracks(user, playlist_id, tracks[50 * i: 50 * (i + 1)])
def user_playlist_create(self, user, name, public=True, description=None):
""" Creates a playlist for a user
Parameters:
- user - the id of the user
- name - the name of the playlist
- public - is the created playlist public
- description - description of the playlist
"""
if description is None:
return super().user_playlist_create(user, name, public)
data = {"name": name, "public": public, "description": description}
return self._post("users/%s/playlists" % (user,), payload=data)
def user_playlist_change_details(self, user, playlist_id, name=None,
public=None, collaborative=None, description=None):
data = {}
# Add Python2 and Python3 compatibility checking string types
try:
basestring
except NameError:
basestring = str
if isinstance(name, basestring):
data["name"] = name
if isinstance(public, bool):
data["public"] = public
if isinstance(collaborative, bool):
data["collaborative"] = collaborative
if isinstance(description, basestring):
data["description"] = description
return self._put("playlists/%s" % playlist_id, payload=data)
def user_playlist_remove_all_occurrences_of_tracks_full(self, user, playlist_id, tracks, snapshot_id=None):
if snapshot_id is None:
response = self.user_playlist(user, playlist_id)
snapshot_id = response["snapshot_id"]
for i in range(int(ceil(len(tracks) / 100))):
response = self.user_playlist_remove_all_occurrences_of_tracks(
user, playlist_id, tracks[100 * i: 100 * (i + 1)], snapshot_id
)
snapshot_id = response["snapshot_id"]
def user_playlist_change_image(self, user, playlist_id, jpeg_image):
byte_buffer = BytesIO()
jpeg_image.save(byte_buffer, format="JPEG")
image_string = base64.b64encode(byte_buffer.getvalue())
return self._put("playlists/%s/images" % playlist_id, payload=image_string, content_type="image/jpeg")
def _put(self, url, args=None, payload=None, content_type="application/json", **kwargs):
if args:
kwargs.update(args)
return self._internal_call("PUT", url, payload, kwargs, content_type=content_type)
def _internal_call(self, method, url, payload, params, content_type="application/json"):
args = dict(params=params)
args["timeout"] = self.requests_timeout
if not url.startswith("http"):
url = self.prefix + url
headers = self._auth_headers()
headers["Content-Type"] = content_type
if payload:
if content_type == "image/jpeg":
args["data"] = payload
else:
args["data"] = json.dumps(payload)
if self.trace_out:
print(url)
try:
r = self._session.request(method, url, headers=headers, proxies=self.proxies, **args)
except InvalidGrantError:
raise
except Exception as e:
capture_exception(e)
self._session = requests.Session()
r = self._session.request(method, url, headers=headers, proxies=self.proxies, **args)
if self.trace: # pragma: no cover
print()
print("headers", headers)
print("http status", r.status_code)
print(method, r.url)
if payload:
print("DATA", json.dumps(payload))
try:
r.raise_for_status()
except HTTPError:
message = ""
if r.text and len(r.text) > 0 and r.text != "null":
message += r.text
try:
r_json = r.json()
if "error" in r_json and "message" in r_json["error"]:
message += r_json["error"]["message"]
except ValueError:
message += " Failed to decode JSON"
finally:
raise SpotifyException(r.status_code, -1, "{0}:\n {1}".format(r.url, message), headers=r.headers)
finally:
r.connection.close()
if r.text and len(r.text) > 0 and r.text != "null":
results = r.json()
return results
else:
return None
@arsaboo
Copy link

arsaboo commented Aug 3, 2022

How do we specify the fields to be returned in the user_playlist_tracks_full function? For example, I am interested in 'artist', 'album', 'track_name', 'track_id' and 'release_date'.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment