Created
January 6, 2020 14:42
-
-
Save rach-sharp/39c6926c1e5efd75a2f9cf0474a946c6 to your computer and use it in GitHub Desktop.
New and Improved Spotify Connection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from datetime import datetime | |
from math import ceil | |
import base64 | |
import pytz | |
import requests | |
import spotipy | |
from io import BytesIO | |
from oauthlib.oauth2 import InvalidGrantError | |
from requests import HTTPError | |
from sentry_sdk import capture_exception | |
from spotipy import SpotifyException | |
class SpotifyConnection(spotipy.Spotify): | |
"""Modified version of the spotify.Spotipy class | |
Main changes are: | |
-implementing additional API endpoints (currently_playing, recently_played) | |
-updating the main internal call method to update the session and retry once on error, | |
due to an issue experienced when performing actions which require an extended time | |
connected. | |
""" | |
def currently_playing(self): | |
"""Gets whatever the authenticated user is currently listening to""" | |
return self._get("me/player/currently-playing") | |
def recently_played(self, limit=50): | |
"""Gets the last 50 songs the user has played. | |
This doesn't include whatever the user is currently listening to, and no more than the | |
last 50 songs are available. | |
""" | |
return self._get("me/player/recently-played", limit=limit) | |
def genre_songs(self, genre_name, limit=50): | |
query = 'genre:"{0}"'.format(genre_name) | |
response = self.search(query, limit=50, offset=0, type="track") | |
results = response["tracks"]["items"] | |
# subsequently runs until it hits the user-defined limit or has read all songs in the library | |
while len(results) < limit: | |
response = self.search(query, limit=50, offset=len(results), type="track") | |
results.extend(response["tracks"]["items"]) | |
return results[:limit] | |
def tracks_full(self, tracks, market=None): | |
results = [] | |
for i in range(int(ceil(len(tracks) / 50))): | |
response = self.tracks(tracks[50 * i: 50 * (i + 1)], market) | |
results.extend(response["tracks"]) | |
return results | |
def artists_full(self, artists): | |
results = [] | |
for i in range(int(ceil(len(artists) / 50))): | |
response = self.artists(artists[50 * i: 50 * (i + 1)]) | |
results.extend(response["artists"]) | |
return results | |
@staticmethod | |
def parse_spotify_datetime(timestamp): | |
try: | |
dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") | |
except ValueError: | |
dt = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ") | |
return dt.replace(tzinfo=pytz.UTC, microsecond=0) | |
def current_user_saved_tracks_full(self, until=None): | |
""" Gets a list of the tracks saved in the current authorized user's | |
"Your Music" library | |
Parameters: | |
- limit - the number of tracks to return | |
- offset - the index of the first track to return | |
""" | |
# first run through also retrieves total no of songs in library | |
response = self.current_user_saved_tracks(limit=50, offset=0) | |
results = response["items"] | |
# subsequently runs until it hits the user-defined limit or has read all songs in the library | |
while len(results) < response["total"] or ( | |
until and (until and self.parse_spotify_datetime(response["items"][-1]["added_at"]) > until) | |
): | |
response = self.current_user_saved_tracks(limit=50, offset=len(results)) | |
results.extend(response["items"]) | |
if len(response["items"]) == 0: | |
break | |
if until and len(results) > 0: | |
while results and self.parse_spotify_datetime(results[-1]["added_at"]) <= until: | |
results.pop() | |
return results | |
def link_tracks(self, track_ids, country_code): | |
response = self.tracks_full(track_ids, country_code) | |
linked_tracks = [] | |
reverse_track_link_mapping = {} | |
for track_rs in response: | |
linked_tracks.append(track_rs["id"]) | |
if "linked_from" in track_rs: | |
reverse_track_link_mapping[track_rs["id"]] = track_rs["linked_from"]["id"] | |
return linked_tracks, reverse_track_link_mapping | |
@staticmethod | |
def unlink_tracks(track_ids, reverse_track_link_mapping): | |
return [reverse_track_link_mapping.get(t, t) for t in track_ids] | |
def user_playlist_tracks_full(self, user, playlist_id=None, fields=None, market=None): | |
""" Get full details of the tracks of a playlist owned by a user. | |
Parameters: | |
- user - the id of the user | |
- playlist_id - the id of the playlist | |
- fields - which fields to return | |
- market - an ISO 3166-1 alpha-2 country code. | |
""" | |
# first run through also retrieves total no of songs in library | |
response = self.user_playlist_tracks(user, playlist_id, fields=fields, limit=100, market=market) | |
results = response["items"] | |
# subsequently runs until it hits the user-defined limit or has read all songs in the library | |
while len(results) < response["total"]: | |
response = self.user_playlist_tracks( | |
user, playlist_id, fields=fields, limit=100, offset=len(results), market=market | |
) | |
results.extend(response["items"]) | |
return results | |
def playlist(self, playlist_id, fields=None): | |
''' Gets playlist of a user | |
Parameters: | |
- user - the id of the user | |
- playlist_id - the id of the playlist | |
- fields - which fields to return | |
''' | |
plid = self._get_id('playlist', playlist_id) | |
return self._get("playlists/%s" % plid, fields=fields) | |
def playlist_tracks(self, playlist_id=None, fields=None, | |
limit=100, offset=0, market=None): | |
''' Get full details of the tracks of a playlist owned by a user. | |
Parameters: | |
- user - the id of the user | |
- playlist_id - the id of the playlist | |
- fields - which fields to return | |
- limit - the maximum number of tracks to return | |
- offset - the index of the first track to return | |
- market - an ISO 3166-1 alpha-2 country code. | |
''' | |
plid = self._get_id('playlist', playlist_id) | |
return self._get("playlists/%s/tracks" % plid, | |
limit=limit, offset=offset, fields=fields, | |
market=market) | |
def playlist_tracks_full(self, playlist_id=None, fields=None, market=None): | |
""" Get full details of the tracks of a playlist owned by a user. | |
Parameters: | |
- user - the id of the user | |
- playlist_id - the id of the playlist | |
- fields - which fields to return | |
- market - an ISO 3166-1 alpha-2 country code. | |
""" | |
# first run through also retrieves total no of songs in library | |
response = self.playlist_tracks(playlist_id, fields=fields, limit=100, market=market) | |
results = response["items"] | |
# subsequently runs until it hits the user-defined limit or has read all songs in the library | |
while len(results) < response["total"]: | |
response = self.playlist_tracks( | |
playlist_id, fields=fields, limit=100, offset=len(results), market=market | |
) | |
results.extend(response["items"]) | |
return results | |
def user_playlists_full(self, user): | |
""" Get full details of the tracks of a playlist owned by a user. | |
Parameters: | |
- user - the id of the user | |
- playlist_id - the id of the playlist | |
- fields - which fields to return | |
- market - an ISO 3166-1 alpha-2 country code. | |
""" | |
# first run through also retrieves total no of songs in library | |
response = self.user_playlists(user, limit=50) | |
results = response["items"] | |
while len(results) < response["total"]: | |
response = self.user_playlists( | |
user, limit=50, offset=len(results) | |
) | |
results.extend(response["items"]) | |
return results | |
def user_playlist_replace_tracks_full(self, user, playlist_id, tracks): | |
self.user_playlist_replace_tracks(user, playlist_id, tracks[0:50]) | |
if len(tracks) > 50: | |
self.user_playlist_add_tracks_full(user, playlist_id, tracks[50:]) | |
def user_playlist_add_tracks_full(self, user, playlist_id, tracks): | |
for i in range(int(ceil(len(tracks) / 50))): | |
self.user_playlist_add_tracks(user, playlist_id, tracks[50 * i: 50 * (i + 1)]) | |
def user_playlist_create(self, user, name, public=True, description=None): | |
""" Creates a playlist for a user | |
Parameters: | |
- user - the id of the user | |
- name - the name of the playlist | |
- public - is the created playlist public | |
- description - description of the playlist | |
""" | |
if description is None: | |
return super().user_playlist_create(user, name, public) | |
data = {"name": name, "public": public, "description": description} | |
return self._post("users/%s/playlists" % (user,), payload=data) | |
def user_playlist_change_details(self, user, playlist_id, name=None, | |
public=None, collaborative=None, description=None): | |
data = {} | |
# Add Python2 and Python3 compatibility checking string types | |
try: | |
basestring | |
except NameError: | |
basestring = str | |
if isinstance(name, basestring): | |
data["name"] = name | |
if isinstance(public, bool): | |
data["public"] = public | |
if isinstance(collaborative, bool): | |
data["collaborative"] = collaborative | |
if isinstance(description, basestring): | |
data["description"] = description | |
return self._put("playlists/%s" % playlist_id, payload=data) | |
def user_playlist_remove_all_occurrences_of_tracks_full(self, user, playlist_id, tracks, snapshot_id=None): | |
if snapshot_id is None: | |
response = self.user_playlist(user, playlist_id) | |
snapshot_id = response["snapshot_id"] | |
for i in range(int(ceil(len(tracks) / 100))): | |
response = self.user_playlist_remove_all_occurrences_of_tracks( | |
user, playlist_id, tracks[100 * i: 100 * (i + 1)], snapshot_id | |
) | |
snapshot_id = response["snapshot_id"] | |
def user_playlist_change_image(self, user, playlist_id, jpeg_image): | |
byte_buffer = BytesIO() | |
jpeg_image.save(byte_buffer, format="JPEG") | |
image_string = base64.b64encode(byte_buffer.getvalue()) | |
return self._put("playlists/%s/images" % playlist_id, payload=image_string, content_type="image/jpeg") | |
def _put(self, url, args=None, payload=None, content_type="application/json", **kwargs): | |
if args: | |
kwargs.update(args) | |
return self._internal_call("PUT", url, payload, kwargs, content_type=content_type) | |
def _internal_call(self, method, url, payload, params, content_type="application/json"): | |
args = dict(params=params) | |
args["timeout"] = self.requests_timeout | |
if not url.startswith("http"): | |
url = self.prefix + url | |
headers = self._auth_headers() | |
headers["Content-Type"] = content_type | |
if payload: | |
if content_type == "image/jpeg": | |
args["data"] = payload | |
else: | |
args["data"] = json.dumps(payload) | |
if self.trace_out: | |
print(url) | |
try: | |
r = self._session.request(method, url, headers=headers, proxies=self.proxies, **args) | |
except InvalidGrantError: | |
raise | |
except Exception as e: | |
capture_exception(e) | |
self._session = requests.Session() | |
r = self._session.request(method, url, headers=headers, proxies=self.proxies, **args) | |
if self.trace: # pragma: no cover | |
print() | |
print("headers", headers) | |
print("http status", r.status_code) | |
print(method, r.url) | |
if payload: | |
print("DATA", json.dumps(payload)) | |
try: | |
r.raise_for_status() | |
except HTTPError: | |
message = "" | |
if r.text and len(r.text) > 0 and r.text != "null": | |
message += r.text | |
try: | |
r_json = r.json() | |
if "error" in r_json and "message" in r_json["error"]: | |
message += r_json["error"]["message"] | |
except ValueError: | |
message += " Failed to decode JSON" | |
finally: | |
raise SpotifyException(r.status_code, -1, "{0}:\n {1}".format(r.url, message), headers=r.headers) | |
finally: | |
r.connection.close() | |
if r.text and len(r.text) > 0 and r.text != "null": | |
results = r.json() | |
return results | |
else: | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
How do we specify the fields to be returned in the
user_playlist_tracks_full
function? For example, I am interested in'artist'
,'album'
,'track_name'
,'track_id'
and'release_date'
.