Created
August 11, 2019 22:29
-
-
Save astoeckel/96aee38015678fae2ef739761f1cf6b5 to your computer and use it in GitHub Desktop.
Python script that transfers the ratings from multiple Rhythmbox databases into a single Rhythmbox database. If the same song exists multiple times in the target database, only the one with the highest quality is assigned a rating.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import sys | |
import urllib.parse | |
import urllib.request | |
import lxml.etree as ET | |
import icu | |
import collections | |
import math | |
class RatingDB: | |
Song = collections.namedtuple('Song', 'title artist album duration rating filesize') | |
def __init__(self): | |
self.songs = [] | |
self.tl = icu.Transliterator.createInstance('Any-Latin; Latin-ASCII') | |
self.attrs = ('title', 'artist', 'album') | |
self.attrs_importance = { | |
'title': 0.6, | |
'artist': 0.2, | |
'album': 0.2 | |
} | |
self.shingles_to_songs = {} | |
self.songs_to_shingles = {attr: [] for attr in self.attrs} | |
def shingle(self, s, l=3): | |
t = ''.join([c for c in self.tl.transliterate(s).lower() if c.isalnum()]) | |
return {t[i:i+l] for i in range(0, len(t) - l)} | |
def read_song(self, x): | |
def text_or_default(key, default=None): | |
elem = x.find(key) | |
if elem is None: | |
return default | |
return elem.text | |
# Fetch the important metadata | |
title = text_or_default("title") | |
artist = text_or_default("artist") | |
album = text_or_default("album") | |
duration = float(text_or_default("duration", 0.0)) | |
rating = text_or_default("rating") | |
filesize = int(text_or_default("file-size", 0)) | |
# Map "unknown" onto nothing | |
if title.lower() == "unknown": | |
title = None | |
if artist.lower() == "unknown": | |
artist = None | |
if album.lower() == "unknown": | |
album = None | |
return RatingDB.Song(title, artist, album, duration, rating, filesize) | |
def read_rhythmdb(self, filename): | |
x_root = ET.parse(filename).getroot() | |
for x_child in x_root: | |
if x_child.tag != 'entry' or x_child.attrib['type'] != 'song': | |
continue | |
song = self.read_song(x_child) | |
if song.rating is None: | |
continue | |
song_idx = len(self.songs) | |
self.songs.append(song) | |
for attr in self.attrs: | |
self.songs_to_shingles[attr].append(set()) | |
s = getattr(song, attr) | |
if s is None: | |
continue | |
shingles = self.shingle(s) | |
for shingle in shingles: | |
if not shingle in self.shingles_to_songs: | |
self.shingles_to_songs[shingle] = set() | |
self.shingles_to_songs[shingle].add(song_idx) | |
self.songs_to_shingles[attr][song_idx] = shingles | |
def infer_rating(self, song): | |
# For relevant attributes, compute the similarity to other songs in the | |
# local database | |
matched_songs = {} | |
for attr in self.attrs: | |
# Convert the attribute to shingles | |
value = getattr(song, attr) | |
if value is None: | |
continue | |
shingles = self.shingle(value) | |
# Fetch all songs sharing the same shingles | |
songs = set() | |
for shingle in shingles: | |
if shingle in self.shingles_to_songs: | |
songs |= self.shingles_to_songs[shingle] | |
# For each song, compute the Jaccard similarity between the song | |
# shingles and the shingles for this attribute | |
sim = {} | |
for song_idx in songs: | |
song_shingles = self.songs_to_shingles[attr][song_idx] | |
sim = len(song_shingles.intersection(shingles)) / len(song_shingles | shingles) | |
if sim > 0.5: | |
if not song_idx in matched_songs: | |
matched_songs[song_idx] = {attr: 0.0 for attr in self.attrs} | |
matched_songs[song_idx][attr] = sim | |
# Compute the overal score, i.e. the likelihood that the given song is | |
# actually one of the matched_songs. Compare the song durations. Return | |
# the above-threshold song with the highest rating. | |
best_rating, best_song_idx, best_p = 0, None, 0.5 | |
for song_idx in matched_songs: | |
matched_song = self.songs[song_idx] | |
p = 0.0 | |
for attr, w in self.attrs_importance.items(): | |
p += w * matched_songs[song_idx][attr] | |
if song.duration > 0.0 and matched_song.duration > 0.0: | |
p *= math.exp(-(song.duration - matched_song.duration)**2 / 1000) | |
rating = 0 if matched_song.rating is None else int(matched_song.rating) | |
if p > 0.5 and rating > best_rating: | |
best_rating = rating | |
best_song_idx = song_idx | |
best_p = p | |
elif p > best_p and rating == best_rating: | |
best_song_idx = song_idx | |
best_p = p | |
return best_song_idx | |
def transfer_ratings(self, filename): | |
matched_songs = {} | |
x_root = ET.parse(filename).getroot() | |
for x_child in x_root: | |
if x_child.tag != 'entry' or x_child.attrib['type'] != 'song': | |
continue | |
# Convert the element into a "song" element | |
song = self.read_song(x_child) | |
# Remove the "rating" element; there can be only one song matched | |
# to the same logical song | |
x_rating = x_child.find("rating") | |
if not x_rating is None: | |
x_child.remove(x_rating) | |
# Try to infer the rating and the logical song | |
song_idx = self.infer_rating(song) | |
if song_idx is None: | |
continue | |
sys.stderr.write(str(song) + ' --> ' + str(self.songs[song_idx]) + '\n') | |
# Remember that this song was matched to the given song index | |
if not song_idx in matched_songs: | |
matched_songs[song_idx] = [] | |
matched_songs[song_idx].append((song, x_child)) | |
# For each matched logical song, find the song in the XML file with the | |
# best quality. Add the "rating" tag with the inferred rating to exactly | |
# one song. | |
for song_idx, songs in matched_songs.items(): | |
largest_filesize, x_tar = 0, None | |
for song, x_child in songs: | |
if song.filesize > largest_filesize: | |
x_tar = x_child | |
largest_filesize = song.filesize | |
if not x_tar is None: | |
x_rating = ET.SubElement(x_tar, "rating") | |
x_rating.text = str(self.songs[song_idx].rating) | |
return x_root | |
db = RatingDB() | |
db.read_rhythmdb('/home/andreas/.local/share/rhythmbox/rhythmdb.xml') | |
db.read_rhythmdb('/home/andreas/.local/share/rhythmbox/rhythmdb.bck.xml') | |
x_new = db.transfer_ratings('/home/andreas/.local/share/rhythmbox/rhythmdb.xml') | |
sys.stdout.buffer.write(ET.tostring(x_new, pretty_print=True)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment