Created
December 30, 2023 05:30
-
-
Save sjlongland/0b433d3653f19b1a1ce83ec0513e16f9 to your computer and use it in GitHub Desktop.
mpd music selection algorithm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf-8 | |
from mpd import MPDClient | |
## SETTINGS | |
## | |
HOST = 'localhost' | |
PORT = '6600' | |
PASSWORD = False | |
### | |
def connect(): | |
client = MPDClient() | |
client.connect(host=HOST, port=PORT) | |
if PASSWORD: | |
client.password(PASSWORD) | |
return client |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# vim: ai ts=4 sw=4 sts=4 expandtab fileencoding=utf-8 | |
import lmdb | |
import os | |
## SETTINGS | |
## | |
MY_DIR = os.path.dirname(__file__) | |
DB_PATH = os.path.join(MY_DIR, "status.lmdb") | |
## | |
env = lmdb.open(DB_PATH, max_dbs=3) | |
lastqueued = env.open_db(b'lastqueued') | |
lastplayed = env.open_db(b'lastplayed') | |
keyof = lambda artist, title : ("%s\t%s" % ( | |
artist.replace("\t","\\t"), | |
title.replace("\t","\\t") | |
)).encode("UTF-8") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Music selection algorithm… | |
All my music files are arranged "by UUID"; so there's a UUID tag in the file metadata, which derives the file name, | |
which improves the efficiency of tools like `rsync` when synchronising music repositories as it avoids re-transferring whole | |
files when a file is renamed. | |
This directory is what MPD sees. | |
`monitor.py` is run in the background, and `randomtrack.py` is run periodically from `cron` to keep adding new files. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# vim: ai ts=4 sw=4 sts=4 expandtab | |
# IMPORTS | |
from mpd import CommandError | |
from db import env, lastplayed, keyof | |
from client import connect | |
import logging | |
import time | |
import os | |
logging.basicConfig(level=logging.DEBUG) | |
client = connect() | |
log = logging.getLogger(os.path.basename(__file__)) | |
while True: | |
events = client.idle() | |
if "player" not in events: | |
continue | |
current = client.currentsong() | |
if current \ | |
and current.get("title") \ | |
and current.get("artist") \ | |
and current.get("file"): | |
# Update now playing | |
now = str(time.time()) | |
key = keyof(current["artist"], current["title"]) | |
client.sticker_set("song", current["file"], "lastplayed_unixtime", now) | |
with env.begin(db=lastplayed, write=True) as txn: | |
txn.put(key, now.encode("utf-8")) | |
client.disconnect() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# vim: ai ts=4 sw=4 sts=4 expandtab | |
# IMPORTS | |
from mpd import CommandError | |
from random import choice | |
from db import env, lastqueued, lastplayed, keyof | |
from client import connect | |
import logging | |
import time | |
import datetime | |
import mutagen | |
import os | |
import argparse | |
## SETTINGS | |
## | |
HOST = 'localhost' | |
PORT = '6600' | |
PASSWORD = False | |
REPO_DIR = "/home/stuartl/music/.by-uuid" | |
MIN_DURATION = 86400 * 31 # ~ 1 month | |
### | |
logging.basicConfig(level=logging.DEBUG) | |
client = connect() | |
log = logging.getLogger(os.path.basename(__file__)) | |
ap = argparse.ArgumentParser() | |
ap.add_argument("count", type=int, default=100, nargs="?") | |
args = ap.parse_args() | |
files = client.list("file") | |
TODAY = datetime.date.today() | |
NOW = time.time() | |
MAX_LAST = NOW - MIN_DURATION | |
IS_CHRISTMAS = (TODAY.month == 12) and (TODAY.day < 26) | |
refresh = NOW | |
status = client.status() | |
playlistlen = int(status['playlistlength']) | |
log.info("Playlist has %d tracks, %d expected", playlistlen, args.count) | |
while playlistlen < args.count: | |
while True: | |
file = choice(files) | |
log.debug("Checking %r", file) | |
filename = file["file"] | |
last = 0.0 | |
for name in ("lastplayed_unixtime", "lastqueued_unixtime"): | |
try: | |
last = float( | |
client.sticker_get("song", filename, name) | |
) | |
break | |
except CommandError as e: | |
if not str(e).endswith("no such sticker"): | |
raise | |
if last >= MAX_LAST: | |
log.debug("Played too recently") | |
continue | |
path = os.path.join(REPO_DIR, filename) | |
if not os.path.exists(path): | |
log.debug("No longer on disk") | |
continue | |
fileobj = mutagen.File(path) | |
if "GROUPING" in fileobj.tags: | |
grouping = fileobj.tags["GROUPING"][0] | |
log.debug("Special grouping: %r", grouping) | |
if grouping.lower() == "christmas": | |
# Is it December? | |
if not IS_CHRISTMAS: | |
# No | |
log.debug("Christmas song not in December") | |
continue | |
else: | |
continue | |
# Extract artist/title metadata | |
artist = fileobj.tags.get("ARTIST", ["N/A"])[0] | |
title = fileobj.tags.get("TITLE", ["N/A"])[0] | |
# Compute a key for LMDB, then look up the last play time there | |
key = keyof(artist, title) | |
with env.begin(db=lastqueued) as txn: | |
at_last = txn.get(key) | |
if at_last is not None: | |
at_last = float(at_last.decode()) | |
if at_last >= MAX_LAST: | |
# We played this album/title before | |
# (maybe another version) | |
log.debug("%s - %s has been queued recently", artist, title) | |
continue | |
with env.begin(db=lastplayed) as txn: | |
at_last = txn.get(key) | |
if at_last is not None: | |
at_last = float(at_last.decode()) | |
if at_last >= MAX_LAST: | |
# We played this album/title before | |
# (maybe another version) | |
log.debug("%s - %s has been played recently", artist, title) | |
continue | |
log.info( | |
"Adding %r: %s - %s [%s]", | |
file, artist, title, | |
fileobj.tags.get("ALBUM", ["N/A"])[0] | |
) | |
client.add(filename) | |
# Mark as added | |
client.sticker_set("song", filename, "lastqueued_unixtime", str(NOW)) | |
with env.begin(db=lastqueued, write=True) as txn: | |
txn.put(key, str(NOW).encode("utf-8")) | |
# Refresh length | |
if refresh < (time.time() - 5): | |
refresh = time.time() | |
status = client.status() | |
playlistlen = int(status['playlistlength']) | |
log.info("Playlist has %d tracks, %d expected", | |
playlistlen, args.count) | |
else: | |
playlistlen += 1 | |
break | |
client.disconnect() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment