Skip to content

Instantly share code, notes, and snippets.

@MostAwesomeDude
Last active July 30, 2023 02:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MostAwesomeDude/13f9a286dbd847179d11481c9e03b80d to your computer and use it in GitHub Desktop.
Save MostAwesomeDude/13f9a286dbd847179d11481c9e03b80d to your computer and use it in GitHub Desktop.
Play a Video
div .button {
display: inline-block;
}
div .flash {
}
{% set mdash = "—" %}
{% set currentGeneration = log.currentGeneration() %}
{% macro makeButton(endpoint, index, label) %}
{% set url = url_for(endpoint, generation=currentGeneration, index=index) %}
<div class="button">
<form action="{{ url }}" method="POST">
<input type="submit" value="{{ label }}" />
</form>
</div>
{% endmacro %}
{% macro videoTable(videos) %}
<table>
<tr>
<th>Title</th>
<th>Author</th>
<th>Provider</th>
<th>Actions</th>
</tr>
{% for video in videos %}
<tr>
<td><a href="{{ video.url }}">{{ video.title | default(mdash) }}</a></td>
<td>
{{ video.author | default(mdash) }}
{% if video.rss %}
(<a href="{{ video.rss }}">RSS/Atom</a>)
{% endif %}
</td>
<td>{{ video.provider | default(mdash) }}</td>
<td>{{ caller(video, loop.index0) }}</td>
</tr>
{% endfor %}
</table>
{% endmacro %}
<!doctype html>
<head>
<title>{{ title }}</title>
<meta name="viewport" content="width=device-width, initial-scale=1" />
<link rel="stylesheet" href="/static/play.css" />
</head>
<body>
<div class="flashes">
{% for message in get_flashed_messages() %}
<div class="flash">
{{ message }}
</div>
{% endfor %}
</div>
<h1>{{ title }}</h1>
<form action="/" method="POST">
<label for="url">URL</label>
<input type="url" id="url" name="url" />
<input type="submit" value="Play!" />
</form>
<h2>Currently playing</h2>
{% if log.currentlyPlaying %}
{% call(video, index) videoTable([log.currentlyPlaying]) %}
<td>&mdash;</td>
{% endcall %}
{% else %}
<em>No video currently playing</em>
{% endif %}
<h2>Upcoming</h2>
{% call(video, index) videoTable(log.upcoming | reverse) %}
{{ makeButton("playNext", index, "Play next") }}
{{ makeButton("playLater", index, "Play later") }}
{{ makeButton("cancel", index, "Cancel") }}
{% endcall %}
<h2>History</h2>
{% call(video, index) videoTable(log.history) %}
{{ makeButton("replay", index, "Replay") }}
{% endcall %}
</body>
from collections import deque
from hashlib import sha256
from html.parser import HTMLParser
import io
import re
import secrets
from threading import Lock
from flask import Flask, abort, flash, redirect, render_template, request, url_for
import mutagen
import requests
from twisted.internet.defer import DeferredLock
from twisted.internet.utils import getProcessOutputAndValue
app = Flask(__name__,
static_folder=".", static_url_path="/static",
template_folder=".")
app.secret_key = secrets.token_hex()
def fixupURL(url):
# Youtube V3 URLs, e.g. from RSS feeds. YT's oEmbed endpoint is broken and
# fails to handle them.
url = re.sub(r"^https://www.youtube.com/v/(.{11})\?version=3$",
lambda m: "https://youtube.com/watch?v=" + m.group(1), url)
# Youtube short URLs.
url = re.sub(r"^https://youtu.be/(.{11})",
lambda m: "https://youtube.com/watch?v=" + m.group(1), url)
return url
oEmbedProviders = {
"reddit.com": ("Reddit", "https://www.reddit.com/oembed"),
"soundcloud.com": ("Soundcloud", "https://soundcloud.com/oembed"),
"tiktok.com": ("TikTok", "https://www.tiktok.com/oembed"),
"twitter.com": ("Twitter", "https://publish.twitter.com/oembed"),
"vimeo.com": ("Vimeo", "https://vimeo.com/api/oembed.json"),
"youtube.com": ("YouTube", "https://www.youtube.com/oembed"),
}
YT_CHANNEL_PATT = re.compile(r"https://www.youtube.com/channel/([A-Za-z0-9_]{24})")
class YTParser(HTMLParser):
channel = None
def handle_starttag(self, tag, attrs):
if tag != "link": return
for (label, attr) in attrs:
m = YT_CHANNEL_PATT.match(attr)
if m:
self.channel = m[1]
def buildRSSForYT(author_url):
parser = YTParser()
parser.feed(requests.get(author_url, timeout=5.0).content.decode("utf-8"))
if parser.channel:
return "https://www.youtube.com/feeds/videos.xml?channel_id=" + parser.channel
def flattenTags(tags, k):
for tag in tags.getall(k): yield from tag.text
def maybeTags(tags, k):
if k in tags: return " / ".join(flattenTags(tags, k))
class Video:
author = None
title = None
provider = None
rss = None
def __init__(self, url):
self.url = fixupURL(url)
def fetchMetadata(self):
try:
# Search for an oEmbed provider first
provider, endpoint = next(v for (k, v) in oEmbedProviders.items() if k in self.url)
self.provider = provider
params = {"format": "json", "url": self.url}
try:
response = requests.get(endpoint, params=params, timeout=5.0).json()
self.updateMetadata(response)
except Exception as err:
print("oEmbed request to", provider, "failed:", err)
pass
except StopIteration:
# Look for ID3 tags in the first MiB; maybe it's a podcast
try:
chunk = next(requests.get(self.url, stream=True).iter_content(chunk_size=1024**2))
f = mutagen.File(fileobj=io.BytesIO(chunk))
if not f: return
self.author = maybeTags(f.tags, "TALB")
self.title = maybeTags(f.tags, "TIT2")
self.provider = maybeTags(f.tags, "TCON")
except StopIteration: pass
except Exception as e:
print("Error while harvesting podcast metadata:", e)
def updateMetadata(self, md):
self.title = md.get("title")
self.author = md.get("author_name")
if md.get("provider_name") == "YouTube" and "author_url" in md:
self.rss = buildRSSForYT(md["author_url"])
class VideoLog:
generation = 0
def __init__(self):
self.history = deque(maxlen=10)
self.currentlyPlaying = None
self.upcoming = deque()
self.lock = Lock()
def currentGeneration(self):
return sha256(str(self.generation).encode("utf-8")).hexdigest()
def advance(self):
with self.lock:
self.generation += 1
if self.currentlyPlaying:
self.history.appendleft(self.currentlyPlaying)
try:
self.currentlyPlaying = self.upcoming.pop()
except IndexError:
self.currentlyPlaying = None
return self.currentlyPlaying
def enqueue(self, video):
video.fetchMetadata()
self.upcoming.appendleft(video)
def replay(self, index):
if not (0 <= index < len(self.history)):
return False
with self.lock:
self.generation += 1
self.upcoming.appendleft(self.history[index])
return True
def popUpcoming(self, index):
# NB: self.lock must be held
# NB: indices are from the "wrong" end, but deques don't care
self.upcoming.rotate(index)
video = self.upcoming.pop()
self.upcoming.rotate(-index)
return video
def playLater(self, index):
if not (0 <= index <= len(self.upcoming)):
return False
with self.lock:
self.generation += 1
self.upcoming.appendleft(self.popUpcoming(index))
return True
def playNext(self, index):
if not (0 <= index <= len(self.upcoming)):
return False
with self.lock:
self.generation += 1
self.upcoming.append(self.popUpcoming(index))
return True
def cancel(self, index):
if not (0 <= index <= len(self.upcoming)):
return False
with self.lock:
self.generation += 1
# NB: Push canceled video to history, so that it can be replayed if
# canceled by accident.
self.history.appendleft(self.popUpcoming(index))
return True
def refreshMetadata(self, index):
if not (0 <= index <= len(self.upcoming)): return False
with self.lock: self.upcoming[index].fetchMetadata()
log = VideoLog()
ssh = "/run/current-system/sw/bin/ssh"
ssh_key = "/home/tv/.ssh/id_ed25519_mpv"
mpvLock = DeferredLock()
def play(video):
d = getProcessOutputAndValue(ssh,
["-i", ssh_key, "tv@localhost",
"DISPLAY=:0", "mpv",
"--fullscreen", "--slang=en,eng,en-US,en-GB", "--cursor-autohide=always",
"--cache=yes", "--demuxer-max-bytes=2GiB", "--cache-pause-initial=yes",
"--ytdl-format=best[width\\<=1920][height\\<=1080]",
video.url])
# (out, err, code)
d.addCallback(lambda rv:
print("ssh/mpv returned an error:", rv) if rv[2] else None)
return d
def playIfNotPlaying(_none):
log.advance()
if log.currentlyPlaying is not None:
d = mpvLock.run(play, log.currentlyPlaying)
d.addCallback(playIfNotPlaying)
return d
@app.route("/", methods=("GET", "POST"))
def frontpage():
if request.method == "POST":
log.enqueue(Video(request.form["url"].strip()))
if log.currentlyPlaying is None: playIfNotPlaying(None)
flash("Appended to the queue!")
return redirect(url_for("frontpage"))
return render_template("play.html", title="Play a Video", log=log)
generationMessage = """
Your click happened while the queue was changing, so your action was
canceled to avoid regret. Sorry about that. Try again in a moment.
"""
@app.route("/replay/<generation>/<int:index>", methods=("POST",))
def replay(generation, index):
if generation == log.currentGeneration():
if not log.replay(index): abort(400, "Shenanigans!")
if log.currentlyPlaying is None: playIfNotPlaying(None)
flash("Added video to queue!")
else: flash(generationMessage)
return redirect(url_for("frontpage"))
@app.route("/next/<generation>/<int:index>", methods=("POST",))
def playNext(generation, index):
if generation == log.currentGeneration():
if not log.playNext(index): abort(400, "Shenanigans!")
flash("Moved to front of queue!")
else: flash(generationMessage)
return redirect(url_for("frontpage"))
@app.route("/later/<generation>/<int:index>", methods=("POST",))
def playLater(generation, index):
if generation == log.currentGeneration():
if not log.playLater(index): abort(400, "Shenanigans!")
flash("Moved to end of queue!")
else: flash(generationMessage)
return redirect(url_for("frontpage"))
@app.route("/cancel/<generation>/<int:index>", methods=("POST",))
def cancel(generation, index):
if generation == log.currentGeneration():
if not log.cancel(index): abort(400, "Shenanigans!")
flash("Removed video from queue!")
else: flash(generationMessage)
return redirect(url_for("frontpage"))
@app.route("/metadata/<generation>/<int:index>", methods=("POST",))
def metadata(generation, index):
if generation == log.currentGeneration():
if not log.refreshMetadata(index): abort(400, "Shenanigans!")
flash("Updated metadata!")
else: flash(generationMessage)
return redirect(url_for("frontpage"))
#!/usr/bin/env nix-shell
#! nix-shell -i bash -p python3Packages.flask python3Packages.mutagen python3Packages.requests python3Packages.twisted
flask -A play:app run
{ nixpkgs ? import <nixpkgs> {} }:
let
inherit (nixpkgs) pkgs;
in pkgs.stdenv.mkDerivation {
name = "play-env";
buildInputs = with pkgs; [
python3Packages.pyflakes
];
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment