Skip to content

Instantly share code, notes, and snippets.

@AstraLuma
Created February 16, 2023 05:58
Show Gist options
  • Save AstraLuma/192db7493238919ffeeb0e1df3716d83 to your computer and use it in GitHub Desktop.
Save AstraLuma/192db7493238919ffeeb0e1df3716d83 to your computer and use it in GitHub Desktop.
StreamBeats OBS web page
#!/usr/bin/env python3
"""
This looks in its directory for files. It looks in the format
`playlist/artist/album/1. title.ext`, allowing components to be omitted.
Requires ffmpeg.
"""
import os
from pathlib import Path
import re
import readline
import shutil
import subprocess
import threading
import tempfile
DEST_ROOT = Path.home() / 'Music' / 'StreamBeats-flac'
encoders = threading.Semaphore(os.cpu_count() * 2)
def encode(infile, outfile, /, metadata=None):
"""
Calls flac and encodes the file.
"""
with encoders:
print(infile)
with tempfile.NamedTemporaryFile('wt') as ntf:
ntf.write(';FFMETADATA1\n')
for k, v in (metadata or {}).items():
if k == 'TRACKNUMBER':
k = 'track'
ntf.write(f"{k}={v}\n")
ntf.flush()
ntf.seek(0)
subprocess.run([
'ffmpeg', '-hide_banner',
'-y',
'-i', infile,
'-i', ntf.name,
'--', outfile,
], check=True, stdin=None)
def prompt(prompt, value=""):
"""
Prompts the user for input, allowing an editable default.
"""
readline.set_startup_hook(lambda: readline.insert_text(value))
try:
return input(prompt) # or raw_input in Python 2
finally:
readline.set_startup_hook()
NUM = re.compile(r'(\d+)\.?\s+(.+)')
def split_num(text):
if m := NUM.match(text):
return m.group(1), m.group(2)
else:
return '', text
def iter_nodes():
root = Path(__file__).absolute().parent
yield root, {}
for node in root.iterdir():
if node.is_file():
yield from iter_track(node)
elif node.is_dir():
yield from iter_artists(node)
def iter_artists(plpath):
yield plpath, {
'PLAYLISTS': [plpath.name.strip()],
}
for node in plpath.iterdir():
if node.is_file():
yield from iter_track(node)
elif node.is_dir():
yield from iter_albums(node)
def iter_albums(apath):
yield apath, {
'ARTIST': apath.name.strip(),
}
for node in apath.iterdir():
if node.is_file():
yield from iter_track(node)
elif node.is_dir():
yield from iter_tracks(node)
def iter_tracks(apath):
yield apath, {
'ALBUM': apath.name.strip(),
}
for node in apath.iterdir():
if node.is_file():
yield from iter_track(node)
def iter_track(path):
num, title = split_num(path.stem)
if path.suffix in ('.wav', '.m4a'):
yield path, {
'TITLE': title.strip(),
'TRACKNUMBER': num.strip(),
}
def clean_name(txt):
return re.sub('[^-a-zA-Z0-9_]', '', txt)
def destination(path, meta):
if meta.get('ALBUM'):
dir = DEST_ROOT / clean_name(meta['ALBUM'])
else:
dir = DEST_ROOT
dir.mkdir(parents=True, exist_ok=True)
ext = '.flac' if path.suffix == '.wav' else path.suffix
if meta.get('TRACKNUMBER'):
base = f"{meta['TRACKNUMBER']} - {meta['TITLE']}"
else:
base = f"{meta['TITLE']}"
return dir / f"{clean_name(base)}{ext}"
metadata = dict(iter_nodes())
def compile_metadata(path):
meta = metadata[path]
while path.parent != path:
meta = metadata.get(path, {}) | meta
path = path.parent
return meta
def append_playlist(playlist, item, meta):
fresh = not playlist.exists()
with playlist.open('at') as plf:
if fresh:
print('#EXTM3U', file=plf)
# TODO: Escape commas
print(f"#EXTINF:0,{meta['TITLE'].replace(',', '')}", file=plf)
print(item.relative_to(playlist.parent), file=plf)
for path in sorted(metadata.keys()):
if path.is_file():
meta = compile_metadata(path)
dest = destination(path, meta)
playlists = meta.pop('PLAYLISTS')
for playlist in playlists:
append_playlist(DEST_ROOT / f"{clean_name(playlist)}.m3u", dest, meta)
if path.suffix in ('.wav',):
threading.Thread(
target=encode,
args=(path, dest),
kwargs={'metadata': meta},
).start()
else:
shutil.copyfile(path, dest)
#!/usr/bin/python3
import pathlib
root = pathlib.Path(__file__).absolute().parent
with open('index.html', 'wt') as f:
print("""<!doctype html>
<html>
<head>
<title>StreamBeats</title>
</head>
<body><ul>""", file=f)
for plpath in root.glob('*.m3u'):
print(f"""<li><a href="player.html#{plpath.stem}">{plpath.stem}</a></li>""", file=f)
print("""</ul>
<p><a href="https://www.senpai.tv/streambeats/">StreamBeats Website</a></p>
<p>You can add a <code>font</code> parameter and the page will load it from
<a href="https://fonts.google.com/">Google fonts</a>. It is assumed this is
being used from OBS and you can inject other style customizations. (Tip: Apply
styles to <code>.title</code>)</p>
</body></html>""", file=f)
<!DOCTYPE html>
<html>
<head>
<title>StreamBeats</title>
<style>
body {
font-family: sans-serif;
font-weight: bold;
}
#title {
text-align: right;
color: white;
text-shadow: 0 0 0.5em black, 0 0 0.2em black, 0 0 0.1em black;
font-size: 40pt;
}
</style>
</head>
<body>
<audio id=player autoplay></audio>
<div id=title>(Title)</div>
<script>
const params = new URLSearchParams(document.location.search);
const fonturl = new URL("https://fonts.googleapis.com/css2");
var chosenfont;
for (const font of params.getAll('font')) {
fonturl.searchParams.append('family', font);
chosenfont = font;
}
const fontstyle = document.createElement('link');
fontstyle.href = fonturl;
fontstyle.rel = 'stylesheet';
document.querySelectorAll('head')[0].appendChild(fontstyle);
if (chosenfont) {
document.body.style['font-family'] = chosenfont;
}
</script>
<script>
var playlist = Array();
const player = document.getElementById('player');
const title_elem = document.getElementById('title');
player.volume = 0.1;
function* parse_playlist(text) {
let enable_ext = false;
let cur_entry = {};
for (let line of text.split('\n')) {
line = line.trim();
if (line.startsWith('#EXTM3U')) {
enable_ext = true;
} else if (enable_ext && line.startsWith('#EXTINF:')) {
// FIXME: Commas in quotes
const bits = line.match(/#EXTINF:(\d+),(?:(.+),)*(.*)/);
cur_entry.time = parseInt(bits[1]);
cur_entry.title = bits[bits.length - 1];
cur_entry.extras = {};
for (const bit of bits.slice(2, -1)) {
if (bit) {
const metabit = bit.match(/(.+)="?(.+)"?/);
if (metabit) {
cur_entry.extras[metabit[1]] = metabit[2];
}
}
}
} else if (line.startsWith('#')) {
// Unknown extension, skip
} else if (line) {
cur_entry.path = line;
yield cur_entry;
cur_entry = {};
}
}
}
async function set_hash(hash) {
console.log("set_hash", hash);
let plname = hash.at(0) == '#' ? hash.substring(1) : hash;
let m3u_resp = await fetch(`/${plname}.m3u`)
if (m3u_resp.ok) {
let text = await m3u_resp.text();
playlist = Array.from(parse_playlist(text));
console.log(playlist);
play_next();
}
}
function play_next() {
const track = playlist[Math.floor(Math.random() * playlist.length)];
console.log("Now Playing", track);
player.src = new URL(track.path, document.location); // FIXME: relative to the M3U
player.play();
title_elem.innerText = track.title || track.path;
}
addEventListener('hashchange', (event) => {
let addr = new URL(event.newURL);
set_hash(addr.hash);
});
set_hash(location.hash);
player.addEventListener('ended', (event) => {
play_next();
});
</script>
</body>
</html>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment