Skip to content

Instantly share code, notes, and snippets.

@blacklight
Created September 20, 2022 08:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save blacklight/3d50678fc1f256ee1d59d1016cc51798 to your computer and use it in GitHub Desktop.
Save blacklight/3d50678fc1f256ee1d59d1016cc51798 to your computer and use it in GitHub Desktop.
# ~/.config/platypush/scripts/music/suggestions.py
import logging
from sqlalchemy import tuple_
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.sql.expression import bindparam
from platypush.context import get_plugin, Variable
from platypush.cron import cron
from scripts.music.db import (
get_db_session, Track, TrackActivity, TrackSimilar
)
logger = logging.getLogger('music_suggestions')
# This stored variable will keep track of the latest activity ID for which the
# suggestions were calculated
last_activity_id_var = Variable('LAST_PROCESSED_ACTIVITY_ID')
# A cronjob that runs every 5 minutes and updates the suggestions
@cron('*/5 * * * *')
def refresh_similar_tracks(**_):
last_activity_id = int(last_activity_id_var.get() or 0)
# Retrieve all the tracks played since the latest synchronized activity ID
# that don't have any similar tracks being calculated yet
with get_db_session() as session:
recent_tracks_without_similars = \
_get_recent_tracks_without_similars(last_activity_id)
try:
if not recent_tracks_without_similars:
raise StopIteration(
'All the recent tracks have processed suggestions')
# Get the last activity_id
batch_size = 10
last_activity_id = (
recent_tracks_without_similars[:batch_size][-1]['activity_id'])
logger.info(
'Processing suggestions for %d/%d tracks',
min(batch_size, len(recent_tracks_without_similars)),
len(recent_tracks_without_similars))
# Build the track_id -> [similar_tracks] map
similars_by_track = {
track['track_id']: _get_similar_tracks(track['artist'], track['title'])
for track in recent_tracks_without_similars[:batch_size]
}
# Map all the similar tracks in an (artist, title) -> info data structure
similar_tracks_by_artist_and_title = \
_get_similar_tracks_by_artist_and_title(similars_by_track)
if not similar_tracks_by_artist_and_title:
raise StopIteration('No new suggestions to process')
# Sync all the new similar tracks to the database
similar_tracks = \
_sync_missing_similar_tracks(similar_tracks_by_artist_and_title)
# Link listened tracks to similar tracks
with get_db_session() as session:
stmt = insert(TrackSimilar).values({
'source_track_id': bindparam('source_track_id'),
'target_track_id': bindparam('target_track_id'),
'match_score': bindparam('match_score'),
}).on_conflict_do_nothing()
session.execute(
stmt, [
{
'source_track_id': track_id,
'target_track_id': similar_tracks[(similar['artist'], similar['title'])].id,
'match_score': similar['score'],
}
for track_id, similars in similars_by_track.items()
for similar in (similars or [])
if (similar['artist'], similar['title'])
in similar_tracks
]
)
session.flush()
session.commit()
except StopIteration as e:
logger.info(e)
last_activity_id_var.set(last_activity_id)
logger.info('Suggestions updated')
def _get_similar_tracks(artist, title):
"""
Use the last.fm API to retrieve the tracks similar to a given
artist/title pair
"""
import pylast
lastfm = get_plugin('lastfm')
try:
return lastfm.get_similar_tracks(
artist=artist,
title=title,
limit=10,
)
except pylast.PyLastError as e:
logger.warning(
'Could not find tracks similar to %s - %s: %s',
artist, title, e
)
def _get_recent_tracks_without_similars(last_activity_id):
"""
Get all the tracks played after a certain activity ID that don't have
any suggestions yet.
"""
with get_db_session() as session:
return [
{
'track_id': t[0],
'artist': t[1],
'title': t[2],
'activity_id': t[3],
}
for t in session.query(
Track.id.label('track_id'),
Track.artist,
Track.title,
TrackActivity.id.label('activity_id'),
)
.select_from(
Track.__table__
.join(
TrackSimilar,
Track.id == TrackSimilar.source_track_id,
isouter=True
)
.join(
TrackActivity,
Track.id == TrackActivity.track_id
)
)
.filter(
TrackSimilar.source_track_id.is_(None),
TrackActivity.id > last_activity_id
)
.order_by(TrackActivity.id)
.all()
]
def _get_similar_tracks_by_artist_and_title(similars_by_track):
"""
Map similar tracks into an (artist, title) -> track dictionary
"""
similar_tracks_by_artist_and_title = {}
for similar in similars_by_track.values():
for track in (similar or []):
similar_tracks_by_artist_and_title[
(track['artist'], track['title'])
] = track
return similar_tracks_by_artist_and_title
def _sync_missing_similar_tracks(similar_tracks_by_artist_and_title):
"""
Flush newly calculated similar tracks to the database.
"""
logger.info('Syncing missing similar tracks')
with get_db_session() as session:
stmt = insert(Track).values({
'artist': bindparam('artist'),
'title': bindparam('title'),
}).on_conflict_do_nothing()
session.execute(stmt, list(similar_tracks_by_artist_and_title.values()))
session.flush()
session.commit()
tracks = session.query(Track).filter(
tuple_(Track.artist, Track.title).in_(
similar_tracks_by_artist_and_title
)
).all()
return {
(track.artist, track.title): track
for track in tracks
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment