Skip to content

Instantly share code, notes, and snippets.

@sjlongland
Created December 30, 2023 05:30
Show Gist options
  • Save sjlongland/0b433d3653f19b1a1ce83ec0513e16f9 to your computer and use it in GitHub Desktop.
Save sjlongland/0b433d3653f19b1a1ce83ec0513e16f9 to your computer and use it in GitHub Desktop.
mpd music selection algorithm
#!/usr/bin/env python
# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf-8
from mpd import MPDClient
## SETTINGS
##
HOST = 'localhost'
PORT = '6600'
PASSWORD = False
###
def connect():
client = MPDClient()
client.connect(host=HOST, port=PORT)
if PASSWORD:
client.password(PASSWORD)
return client
#!/usr/bin/env python
# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf-8
import lmdb
import os
## SETTINGS
##
MY_DIR = os.path.dirname(__file__)
DB_PATH = os.path.join(MY_DIR, "status.lmdb")
##
env = lmdb.open(DB_PATH, max_dbs=3)
lastqueued = env.open_db(b'lastqueued')
lastplayed = env.open_db(b'lastplayed')
keyof = lambda artist, title : ("%s\t%s" % (
artist.replace("\t","\\t"),
title.replace("\t","\\t")
)).encode("UTF-8")
Music selection algorithm…
All my music files are arranged "by UUID"; so there's a UUID tag in the file metadata, which derives the file name,
which improves the efficiency of tools like `rsync` when synchronising music repositories as it avoids re-transferring whole
files when a file is renamed.
This directory is what MPD sees.
`monitor.py` is run in the background, and `randomtrack.py` is run periodically from `cron` to keep adding new files.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim: ai ts=4 sw=4 sts=4 expandtab
# IMPORTS
from mpd import CommandError
from db import env, lastplayed, keyof
from client import connect
import logging
import time
import os
logging.basicConfig(level=logging.DEBUG)
client = connect()
log = logging.getLogger(os.path.basename(__file__))
while True:
events = client.idle()
if "player" not in events:
continue
current = client.currentsong()
if current \
and current.get("title") \
and current.get("artist") \
and current.get("file"):
# Update now playing
now = str(time.time())
key = keyof(current["artist"], current["title"])
client.sticker_set("song", current["file"], "lastplayed_unixtime", now)
with env.begin(db=lastplayed, write=True) as txn:
txn.put(key, now.encode("utf-8"))
client.disconnect()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim: ai ts=4 sw=4 sts=4 expandtab
# IMPORTS
from mpd import CommandError
from random import choice
from db import env, lastqueued, lastplayed, keyof
from client import connect
import logging
import time
import datetime
import mutagen
import os
import argparse
## SETTINGS
##
HOST = 'localhost'
PORT = '6600'
PASSWORD = False
REPO_DIR = "/home/stuartl/music/.by-uuid"
MIN_DURATION = 86400 * 31 # ~ 1 month
###
logging.basicConfig(level=logging.DEBUG)
client = connect()
log = logging.getLogger(os.path.basename(__file__))
ap = argparse.ArgumentParser()
ap.add_argument("count", type=int, default=100, nargs="?")
args = ap.parse_args()
files = client.list("file")
TODAY = datetime.date.today()
NOW = time.time()
MAX_LAST = NOW - MIN_DURATION
IS_CHRISTMAS = (TODAY.month == 12) and (TODAY.day < 26)
refresh = NOW
status = client.status()
playlistlen = int(status['playlistlength'])
log.info("Playlist has %d tracks, %d expected", playlistlen, args.count)
while playlistlen < args.count:
while True:
file = choice(files)
log.debug("Checking %r", file)
filename = file["file"]
last = 0.0
for name in ("lastplayed_unixtime", "lastqueued_unixtime"):
try:
last = float(
client.sticker_get("song", filename, name)
)
break
except CommandError as e:
if not str(e).endswith("no such sticker"):
raise
if last >= MAX_LAST:
log.debug("Played too recently")
continue
path = os.path.join(REPO_DIR, filename)
if not os.path.exists(path):
log.debug("No longer on disk")
continue
fileobj = mutagen.File(path)
if "GROUPING" in fileobj.tags:
grouping = fileobj.tags["GROUPING"][0]
log.debug("Special grouping: %r", grouping)
if grouping.lower() == "christmas":
# Is it December?
if not IS_CHRISTMAS:
# No
log.debug("Christmas song not in December")
continue
else:
continue
# Extract artist/title metadata
artist = fileobj.tags.get("ARTIST", ["N/A"])[0]
title = fileobj.tags.get("TITLE", ["N/A"])[0]
# Compute a key for LMDB, then look up the last play time there
key = keyof(artist, title)
with env.begin(db=lastqueued) as txn:
at_last = txn.get(key)
if at_last is not None:
at_last = float(at_last.decode())
if at_last >= MAX_LAST:
# We played this album/title before
# (maybe another version)
log.debug("%s - %s has been queued recently", artist, title)
continue
with env.begin(db=lastplayed) as txn:
at_last = txn.get(key)
if at_last is not None:
at_last = float(at_last.decode())
if at_last >= MAX_LAST:
# We played this album/title before
# (maybe another version)
log.debug("%s - %s has been played recently", artist, title)
continue
log.info(
"Adding %r: %s - %s [%s]",
file, artist, title,
fileobj.tags.get("ALBUM", ["N/A"])[0]
)
client.add(filename)
# Mark as added
client.sticker_set("song", filename, "lastqueued_unixtime", str(NOW))
with env.begin(db=lastqueued, write=True) as txn:
txn.put(key, str(NOW).encode("utf-8"))
# Refresh length
if refresh < (time.time() - 5):
refresh = time.time()
status = client.status()
playlistlen = int(status['playlistlength'])
log.info("Playlist has %d tracks, %d expected",
playlistlen, args.count)
else:
playlistlen += 1
break
client.disconnect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment