Play a Video
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
div .button { | |
display: inline-block; | |
} | |
div .flash { | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{% set mdash = "—" %} | |
{% set currentGeneration = log.currentGeneration() %} | |
{% macro makeButton(endpoint, index, label) %} | |
{% set url = url_for(endpoint, generation=currentGeneration, index=index) %} | |
<div class="button"> | |
<form action="{{ url }}" method="POST"> | |
<input type="submit" value="{{ label }}" /> | |
</form> | |
</div> | |
{% endmacro %} | |
{% macro videoTable(videos) %} | |
<table> | |
<tr> | |
<th>Title</th> | |
<th>Author</th> | |
<th>Provider</th> | |
<th>Actions</th> | |
</tr> | |
{% for video in videos %} | |
<tr> | |
<td><a href="{{ video.url }}">{{ video.title | default(mdash) }}</a></td> | |
<td> | |
{{ video.author | default(mdash) }} | |
{% if video.rss %} | |
(<a href="{{ video.rss }}">RSS/Atom</a>) | |
{% endif %} | |
</td> | |
<td>{{ video.provider | default(mdash) }}</td> | |
<td>{{ caller(video, loop.index0) }}</td> | |
</tr> | |
{% endfor %} | |
</table> | |
{% endmacro %} | |
<!doctype html> | |
<head> | |
<title>{{ title }}</title> | |
<meta name="viewport" content="width=device-width, initial-scale=1" /> | |
<link rel="stylesheet" href="/static/play.css" /> | |
</head> | |
<body> | |
<div class="flashes"> | |
{% for message in get_flashed_messages() %} | |
<div class="flash"> | |
{{ message }} | |
</div> | |
{% endfor %} | |
</div> | |
<h1>{{ title }}</h1> | |
<form action="/" method="POST"> | |
<label for="url">URL</label> | |
<input type="url" id="url" name="url" /> | |
<input type="submit" value="Play!" /> | |
</form> | |
<h2>Currently playing</h2> | |
{% if log.currentlyPlaying %} | |
{% call(video, index) videoTable([log.currentlyPlaying]) %} | |
<td>—</td> | |
{% endcall %} | |
{% else %} | |
<em>No video currently playing</em> | |
{% endif %} | |
<h2>Upcoming</h2> | |
{% call(video, index) videoTable(log.upcoming | reverse) %} | |
{{ makeButton("playNext", index, "Play next") }} | |
{{ makeButton("playLater", index, "Play later") }} | |
{{ makeButton("cancel", index, "Cancel") }} | |
{% endcall %} | |
<h2>History</h2> | |
{% call(video, index) videoTable(log.history) %} | |
{{ makeButton("replay", index, "Replay") }} | |
{% endcall %} | |
</body> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque | |
from hashlib import sha256 | |
from html.parser import HTMLParser | |
import io | |
import re | |
import secrets | |
from threading import Lock | |
from flask import Flask, abort, flash, redirect, render_template, request, url_for | |
import mutagen | |
import requests | |
from twisted.internet.defer import DeferredLock | |
from twisted.internet.utils import getProcessOutputAndValue | |
app = Flask(__name__, | |
static_folder=".", static_url_path="/static", | |
template_folder=".") | |
app.secret_key = secrets.token_hex() | |
def fixupURL(url): | |
# Youtube V3 URLs, e.g. from RSS feeds. YT's oEmbed endpoint is broken and | |
# fails to handle them. | |
url = re.sub(r"^https://www.youtube.com/v/(.{11})\?version=3$", | |
lambda m: "https://youtube.com/watch?v=" + m.group(1), url) | |
# Youtube short URLs. | |
url = re.sub(r"^https://youtu.be/(.{11})", | |
lambda m: "https://youtube.com/watch?v=" + m.group(1), url) | |
return url | |
oEmbedProviders = { | |
"reddit.com": ("Reddit", "https://www.reddit.com/oembed"), | |
"soundcloud.com": ("Soundcloud", "https://soundcloud.com/oembed"), | |
"tiktok.com": ("TikTok", "https://www.tiktok.com/oembed"), | |
"twitter.com": ("Twitter", "https://publish.twitter.com/oembed"), | |
"vimeo.com": ("Vimeo", "https://vimeo.com/api/oembed.json"), | |
"youtube.com": ("YouTube", "https://www.youtube.com/oembed"), | |
} | |
YT_CHANNEL_PATT = re.compile(r"https://www.youtube.com/channel/([A-Za-z0-9_]{24})") | |
class YTParser(HTMLParser): | |
channel = None | |
def handle_starttag(self, tag, attrs): | |
if tag != "link": return | |
for (label, attr) in attrs: | |
m = YT_CHANNEL_PATT.match(attr) | |
if m: | |
self.channel = m[1] | |
def buildRSSForYT(author_url): | |
parser = YTParser() | |
parser.feed(requests.get(author_url, timeout=5.0).content.decode("utf-8")) | |
if parser.channel: | |
return "https://www.youtube.com/feeds/videos.xml?channel_id=" + parser.channel | |
def flattenTags(tags, k): | |
for tag in tags.getall(k): yield from tag.text | |
def maybeTags(tags, k): | |
if k in tags: return " / ".join(flattenTags(tags, k)) | |
class Video: | |
author = None | |
title = None | |
provider = None | |
rss = None | |
def __init__(self, url): | |
self.url = fixupURL(url) | |
def fetchMetadata(self): | |
try: | |
# Search for an oEmbed provider first | |
provider, endpoint = next(v for (k, v) in oEmbedProviders.items() if k in self.url) | |
self.provider = provider | |
params = {"format": "json", "url": self.url} | |
try: | |
response = requests.get(endpoint, params=params, timeout=5.0).json() | |
self.updateMetadata(response) | |
except Exception as err: | |
print("oEmbed request to", provider, "failed:", err) | |
pass | |
except StopIteration: | |
# Look for ID3 tags in the first MiB; maybe it's a podcast | |
try: | |
chunk = next(requests.get(self.url, stream=True).iter_content(chunk_size=1024**2)) | |
f = mutagen.File(fileobj=io.BytesIO(chunk)) | |
if not f: return | |
self.author = maybeTags(f.tags, "TALB") | |
self.title = maybeTags(f.tags, "TIT2") | |
self.provider = maybeTags(f.tags, "TCON") | |
except StopIteration: pass | |
except Exception as e: | |
print("Error while harvesting podcast metadata:", e) | |
def updateMetadata(self, md): | |
self.title = md.get("title") | |
self.author = md.get("author_name") | |
if md.get("provider_name") == "YouTube" and "author_url" in md: | |
self.rss = buildRSSForYT(md["author_url"]) | |
class VideoLog: | |
generation = 0 | |
def __init__(self): | |
self.history = deque(maxlen=10) | |
self.currentlyPlaying = None | |
self.upcoming = deque() | |
self.lock = Lock() | |
def currentGeneration(self): | |
return sha256(str(self.generation).encode("utf-8")).hexdigest() | |
def advance(self): | |
with self.lock: | |
self.generation += 1 | |
if self.currentlyPlaying: | |
self.history.appendleft(self.currentlyPlaying) | |
try: | |
self.currentlyPlaying = self.upcoming.pop() | |
except IndexError: | |
self.currentlyPlaying = None | |
return self.currentlyPlaying | |
def enqueue(self, video): | |
video.fetchMetadata() | |
self.upcoming.appendleft(video) | |
def replay(self, index): | |
if not (0 <= index < len(self.history)): | |
return False | |
with self.lock: | |
self.generation += 1 | |
self.upcoming.appendleft(self.history[index]) | |
return True | |
def popUpcoming(self, index): | |
# NB: self.lock must be held | |
# NB: indices are from the "wrong" end, but deques don't care | |
self.upcoming.rotate(index) | |
video = self.upcoming.pop() | |
self.upcoming.rotate(-index) | |
return video | |
def playLater(self, index): | |
if not (0 <= index <= len(self.upcoming)): | |
return False | |
with self.lock: | |
self.generation += 1 | |
self.upcoming.appendleft(self.popUpcoming(index)) | |
return True | |
def playNext(self, index): | |
if not (0 <= index <= len(self.upcoming)): | |
return False | |
with self.lock: | |
self.generation += 1 | |
self.upcoming.append(self.popUpcoming(index)) | |
return True | |
def cancel(self, index): | |
if not (0 <= index <= len(self.upcoming)): | |
return False | |
with self.lock: | |
self.generation += 1 | |
# NB: Push canceled video to history, so that it can be replayed if | |
# canceled by accident. | |
self.history.appendleft(self.popUpcoming(index)) | |
return True | |
def refreshMetadata(self, index): | |
if not (0 <= index <= len(self.upcoming)): return False | |
with self.lock: self.upcoming[index].fetchMetadata() | |
log = VideoLog() | |
ssh = "/run/current-system/sw/bin/ssh" | |
ssh_key = "/home/tv/.ssh/id_ed25519_mpv" | |
mpvLock = DeferredLock() | |
def play(video): | |
d = getProcessOutputAndValue(ssh, | |
["-i", ssh_key, "tv@localhost", | |
"DISPLAY=:0", "mpv", | |
"--fullscreen", "--slang=en,eng,en-US,en-GB", "--cursor-autohide=always", | |
"--cache=yes", "--demuxer-max-bytes=2GiB", "--cache-pause-initial=yes", | |
"--ytdl-format=best[width\\<=1920][height\\<=1080]", | |
video.url]) | |
# (out, err, code) | |
d.addCallback(lambda rv: | |
print("ssh/mpv returned an error:", rv) if rv[2] else None) | |
return d | |
def playIfNotPlaying(_none): | |
log.advance() | |
if log.currentlyPlaying is not None: | |
d = mpvLock.run(play, log.currentlyPlaying) | |
d.addCallback(playIfNotPlaying) | |
return d | |
@app.route("/", methods=("GET", "POST")) | |
def frontpage(): | |
if request.method == "POST": | |
log.enqueue(Video(request.form["url"].strip())) | |
if log.currentlyPlaying is None: playIfNotPlaying(None) | |
flash("Appended to the queue!") | |
return redirect(url_for("frontpage")) | |
return render_template("play.html", title="Play a Video", log=log) | |
generationMessage = """ | |
Your click happened while the queue was changing, so your action was | |
canceled to avoid regret. Sorry about that. Try again in a moment. | |
""" | |
@app.route("/replay/<generation>/<int:index>", methods=("POST",)) | |
def replay(generation, index): | |
if generation == log.currentGeneration(): | |
if not log.replay(index): abort(400, "Shenanigans!") | |
if log.currentlyPlaying is None: playIfNotPlaying(None) | |
flash("Added video to queue!") | |
else: flash(generationMessage) | |
return redirect(url_for("frontpage")) | |
@app.route("/next/<generation>/<int:index>", methods=("POST",)) | |
def playNext(generation, index): | |
if generation == log.currentGeneration(): | |
if not log.playNext(index): abort(400, "Shenanigans!") | |
flash("Moved to front of queue!") | |
else: flash(generationMessage) | |
return redirect(url_for("frontpage")) | |
@app.route("/later/<generation>/<int:index>", methods=("POST",)) | |
def playLater(generation, index): | |
if generation == log.currentGeneration(): | |
if not log.playLater(index): abort(400, "Shenanigans!") | |
flash("Moved to end of queue!") | |
else: flash(generationMessage) | |
return redirect(url_for("frontpage")) | |
@app.route("/cancel/<generation>/<int:index>", methods=("POST",)) | |
def cancel(generation, index): | |
if generation == log.currentGeneration(): | |
if not log.cancel(index): abort(400, "Shenanigans!") | |
flash("Removed video from queue!") | |
else: flash(generationMessage) | |
return redirect(url_for("frontpage")) | |
@app.route("/metadata/<generation>/<int:index>", methods=("POST",)) | |
def metadata(generation, index): | |
if generation == log.currentGeneration(): | |
if not log.refreshMetadata(index): abort(400, "Shenanigans!") | |
flash("Updated metadata!") | |
else: flash(generationMessage) | |
return redirect(url_for("frontpage")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env nix-shell | |
#! nix-shell -i bash -p python3Packages.flask python3Packages.mutagen python3Packages.requests python3Packages.twisted | |
flask -A play:app run |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ nixpkgs ? import <nixpkgs> {} }: | |
let | |
inherit (nixpkgs) pkgs; | |
in pkgs.stdenv.mkDerivation { | |
name = "play-env"; | |
buildInputs = with pkgs; [ | |
python3Packages.pyflakes | |
]; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment