Skip to content

Instantly share code, notes, and snippets.

@crides
Last active January 10, 2020 23:27
Show Gist options
  • Save crides/a6a8f46ecf4b6434957b25a8b2fd0d21 to your computer and use it in GitHub Desktop.
Save crides/a6a8f46ecf4b6434957b25a8b2fd0d21 to your computer and use it in GitHub Desktop.
A simple YouTube play queue reading from RSS feed from Youtube with sync features
#!/usr/bin/python3
import json, os, time
QUEUE_FILE = "youtube.json"
def dump_queue(Q):
json.dump(Q, open(QUEUE_FILE, "w"), indent=2)
def read_queue():
return json.load(open(QUEUE_FILE))
def get_sub_urls():
from xml.etree import ElementTree
root = ElementTree.parse("subscription_manager.xml").getroot()
return [sub.get("xmlUrl") for sub in root[0][0]]
def get_vids_from_sub(url, from_time):
import feedparser, time
feed = feedparser.parse(url)
new_vids = []
for entry in feed.entries:
unix_time = time.mktime(entry["published_parsed"])
if unix_time > from_time:
new_vids.append({
"channel": entry["author"],
"title": entry["title"],
"link": entry["link"],
"unix_time": unix_time,
})
new_vids.sort(key=lambda v:v["unix_time"])
return new_vids
def get_duration(url):
# Naive processing without BS
from urllib import request
body = request.urlopen(url).read().decode("utf8")
lines = body.split("\n")
for line in lines:
if 'itemprop="duration"' in line:
start = line.find("PT") + 2
end = line.find("S")
time = line[start:end].split("M")
time = int(time[0]) * 60 + int(time[1])
return time
def id_from_url(url):
id_ind = url.find("v=") + 2
return url[id_ind:]
def download_thumbnail(url):
from urllib import request
import os
destdir = f"{os.environ['HOME']}/.config/youtube/"
id = id_from_url(url)
request.urlretrieve(f"https://i1.ytimg.com/vi/{id}/hqdefault.jpg", f"{destdir}{id}.jpg")
def renew_queue():
Q = read_queue()
last_fetch = Q["fetch_time"]
new_fetch = time.time() + time.timezone
new_vids = []
sub_urls = get_sub_urls()
for i, sub_url in enumerate(sub_urls):
print(f"Fetching subscribers {i+1}/{len(sub_urls)}", end="\r")
new_vids.extend(get_vids_from_sub(sub_url, last_fetch))
print()
Q["fetch_time"] = new_fetch
Q["videos"] += new_vids
print(f"Added {len(new_vids)} videos to queue")
dump_queue(Q)
for i, video in enumerate(new_vids):
print(f"Downloading thumbnails {i+1}/{len(new_vids)}", end="\r")
link = video["link"]
video["duration"] = get_duration(link)
dump_queue(Q)
download_thumbnail(link)
print()
def push_queue():
from github import Github, InputFileContent
from getpass import getpass
queue_content = open(QUEUE_FILE).read()
Q = json.loads(queue_content)
username = input("Username: ")
passwd = getpass("Password: ")
g = Github(username, passwd)
user = g.get_user()
try:
if "gist_id" in Q:
gist_id = Q["gist_id"]
gist = g.get_gist(gist_id)
gist_files = {QUEUE_FILE: InputFileContent(queue_content, QUEUE_FILE)}
gist.edit("Youtube Queue", gist_files)
else:
gist_files = {QUEUE_FILE: InputFileContent("Nothing", QUEUE_FILE)}
gist = user.create_gist(False, gist_files, "Youtube Queue")
print(f"Created new gist: {gist_id}")
Q["gist_id"] = gist.id
queue_content = json.dumps(Q, indent=2)
dump_queue(Q)
gist_files = {QUEUE_FILE: InputFileContent(queue_content, QUEUE_FILE)}
gist.edit("Youtube Queue", gist_files)
except Exception as e:
print("Github error:", e)
def pull_queue():
from getpass import getpass
from github import Github
Q = read_queue()
username = input("Username: ")
passwd = getpass("Password: ")
g = Github(username, passwd)
user = g.get_user()
try:
if "gist_id" in Q:
gist_id = Q["gist_id"]
gist = g.get_gist(gist_id)
open(QUEUE_FILE, "w").write(gist.files[QUEUE_FILE].content)
else:
print("No Gist ID")
except Exception as e:
print("Github error:", e)
def list_videos():
print("{:20}{:80}{}".format("Channel", "Title", "Time"))
for vid in json.load(open(QUEUE_FILE))["videos"]:
print("{:20}{:80}{}".format(vid["channel"], vid["title"], time.ctime(vid["unix_time"])))
def play_queue(resume=False):
import textwrap, itertools
from subprocess import Popen, PIPE
if resume:
Q = read_queue()
if "last_played" in Q:
Popen("mpv " + Q["last_played"], shell=True)
while True:
Q = read_queue()
videos = Q["videos"]
if len(videos) == 0:
print("No videos in queue.")
break
channel_width = max(map(lambda v:len(v["channel"]), videos)) + 2
fzf_lines = [f"{'Channel':{channel_width}}Title"]
fzf_lines.extend( \
[f"{v['channel']:{channel_width}}{v['title']}\xa0{v['link']}" \
for v in videos])
fzf_input = "\n".join(fzf_lines)
fzf_binds = [
("ctrl-f", "page-down"),
("ctrl-b", "page-up"),
]
fzf_opts = [
"-e", # --exact
# "--no-clear",
"-m", # --multi
"--reverse",
"--header-lines=1",
"-d \xa0", # --delimiter
"--with-nth=1",
"--expect=del,enter",
]
fzf_opts.append("--bind=" + ",".join(f"{b[0]}:{b[1]}" for b in fzf_binds))
fzf_cmd = f"fzf {' '.join(fzf_opts)}"
fzf = Popen(fzf_cmd, stdin=PIPE, stdout=PIPE, shell=True)
fzf.stdin.write(bytes(fzf_input, "utf8"))
fzf.stdin.close()
results = fzf.stdout.readlines()
if len(results) == 0:
break
key = results.pop(0)[:-1].decode("utf8")
for i, line in enumerate(results):
results[i] = line[:-1].split(b"\xa0")[1].decode("utf8")
if key == "del":
Q["videos"] = list(filter(lambda v:v["link"] not in results, videos))
dump_queue(Q)
elif key == "enter":
# os.system("tput rmcup")
mpv_out = os.popen("mpv --quiet --msg-level=all=no,cplayer=info " + " ".join(results))
while True:
line = mpv_out.readline()
if line == "\n":
break
if line.startswith("Playing: "):
link = line.split(": ")[1][:-1]
Q["videos"] = list(filter(lambda v:v["link"] != link, videos))
Q["last_played"] = link
dump_queue(Q)
# os.system("tput rmcup")
def usage():
print(f"Usage: {sys.argv[0]} play | resume | fetch | list | push | pull")
exit(1)
if __name__ == "__main__":
import sys
if len(sys.argv) != 2:
usage()
if sys.argv[1] == "play":
play_queue()
elif sys.argv[1] == "resume":
play_queue(True)
elif sys.argv[1] == "fetch":
renew_queue()
elif sys.argv[1] == "list":
list_videos()
elif sys.argv[1] == "push":
push_queue()
elif sys.argv[1] == "pull":
pull_queue()
else:
usage()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment