Skip to content

Instantly share code, notes, and snippets.

@phw
Last active May 18, 2023 00:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save phw/33a009938141605780ca55bdf9203dc7 to your computer and use it in GitHub Desktop.
Save phw/33a009938141605780ca55bdf9203dc7 to your computer and use it in GitHub Desktop.
ListenBrainz import
#!/usr/bin/env python
import json
from listenbrainz import ListenBrainzClient, Track
# LISTENS_EXPORT = '/home/phw/Sync/listens-outsidecontext-2020-08-10.json'
LISTENS_EXPORT = '/home/phw/Sync/output.json'
LISTENBRAINZ_TOKEN = 'YOUR-TOKEN'
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
def clean_additional_info(info):
for (k, v) in dict(info).items():
if not v:
del info[k]
return info
with open(LISTENS_EXPORT, mode='r', encoding="utf-8") as f:
listens = json.load(f)
# listens = listens[:1]
listens = [
(l['listened_at'],
Track(l['track_metadata']['artist_name'],
l['track_metadata']['track_name'],
l['track_metadata'].get('release_name'),
clean_additional_info(l['track_metadata'].get('additional_info')),
)
)
for l in listens
]
total_count = len(listens)
submitted_count = 0
print("Read %i listens" % total_count)
client = ListenBrainzClient()
client.user_token = LISTENBRAINZ_TOKEN
listens = sorted(listens, key=lambda k: k[0], reverse=True)[:10]
print(listens)
for submission in chunks(listens, 200):
# client.import_tracks(submission)
submitted_count += len(submission)
print("Submitted %i/%i listens" % (submitted_count, total_count))
print("Submitted all listens")
# Copyright (c) 2018 Philipp Wolfer <ph.wolfer@gmail.com>
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import json
import logging
import ssl
import time
from http.client import HTTPSConnection
HOST_NAME = "api.listenbrainz.org"
PATH_SUBMIT = "/1/submit-listens"
SSL_CONTEXT = ssl.create_default_context()
class Track:
"""
Represents a single track to submit.
See https://listenbrainz.readthedocs.io/en/latest/dev/json.html
"""
def __init__(self, artist_name, track_name,
release_name=None, additional_info={}):
"""
Create a new Track instance
@param artist_name as str
@param track_name as str
@param release_name as str
@param additional_info as dict
"""
self.artist_name = artist_name
self.track_name = track_name
self.release_name = release_name
self.additional_info = additional_info
@staticmethod
def from_dict(data):
return Track(
data["artist_name"],
data["track_name"],
data.get("release_name", None),
data.get("additional_info", {})
)
def to_dict(self):
data = {
"artist_name": self.artist_name,
"track_name": self.track_name,
"additional_info": self.additional_info
}
if self.release_name:
data['release_name'] = self.release_name
return data
def __repr__(self):
return "Track(%s, %s)" % (self.artist_name, self.track_name)
class ListenBrainzClient:
"""
Submit listens to ListenBrainz.org.
See https://listenbrainz.readthedocs.io/en/latest/dev/api.html
"""
def __init__(self, logger=logging.getLogger(__name__)):
self.__next_request_time = 0
self.user_token = None
self.logger = logger
def listen(self, listened_at, track):
"""
Submit a listen for a track
@param listened_at as int
@param entry as Track
"""
payload = _get_payload(track, listened_at)
return self._submit("single", [payload])
def playing_now(self, track):
"""
Submit a playing now notification for a track
@param track as Track
"""
payload = _get_payload(track)
return self._submit("playing_now", [payload])
def import_tracks(self, tracks):
"""
Import a list of tracks as (listened_at, Track) pairs
@param track as [(int, Track)]
"""
payload = _get_payload_many(tracks)
return self._submit("import", payload)
def _submit(self, listen_type, payload, retry=0):
self._wait_for_ratelimit()
self.logger.debug("ListenBrainz %s: %r", listen_type, payload)
data = {
"listen_type": listen_type,
"payload": payload
}
headers = {
"Authorization": "Token %s" % self.user_token,
"Content-Type": "application/json"
}
body = json.dumps(data)
conn = HTTPSConnection(HOST_NAME, context=SSL_CONTEXT)
conn.request("POST", PATH_SUBMIT, body, headers)
response = conn.getresponse()
response_text = response.read()
try:
response_data = json.loads(response_text)
except json.decoder.JSONDecodeError:
response_data = response_text
self._handle_ratelimit(response)
log_msg = "Response %s: %r" % (response.status, response_data)
if response.status == 429 and retry < 5: # Too Many Requests
self.logger.warning(log_msg)
return self._submit(listen_type, payload, retry + 1)
elif response.status == 200:
self.logger.debug(log_msg)
else:
self.logger.error(log_msg)
self.logger.error(payload)
return response
def _wait_for_ratelimit(self):
now = time.time()
if self.__next_request_time > now:
delay = self.__next_request_time - now
self.logger.debug("Rate limit applies, delay %d", delay)
time.sleep(delay)
def _handle_ratelimit(self, response):
remaining = int(response.getheader("X-RateLimit-Remaining", 0))
reset_in = int(response.getheader("X-RateLimit-Reset-In", 0))
self.logger.debug("X-RateLimit-Remaining: %i", remaining)
self.logger.debug("X-RateLimit-Reset-In: %i", reset_in)
if remaining == 0:
self.__next_request_time = time.time() + reset_in
def _get_payload_many(tracks):
payload = []
for (listened_at, track) in tracks:
data = _get_payload(track, listened_at)
payload.append(data)
return payload
def _get_payload(track, listened_at=None):
data = {
"track_metadata": track.to_dict()
}
if listened_at is not None:
data["listened_at"] = listened_at
return data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment