Skip to content

Instantly share code, notes, and snippets.

@ChuckMac
Created May 4, 2023 14:08
Show Gist options
  • Save ChuckMac/788e837ff0c4b90e2fe46047d0713e05 to your computer and use it in GitHub Desktop.
Save ChuckMac/788e837ff0c4b90e2fe46047d0713e05 to your computer and use it in GitHub Desktop.
Watch and auto-download new episodes of a YouTube channel and rename to Plex friendly format
"""
This script can be used to attempt to download new videos from a
YouTube channel and rename them to a Plex friendly format.
ex: s01e01 - Video Title[ID].mp4
s01e01 - Video Title.json
s01e01 - Video Title.webp
It will skip any videos that are shorter than the specified
minimum length (in seconds) to avoid downloading shorts.
It will also skip any videos that already exist in the local archive directory.
The script will keep track of the last downloaded episode and use that to
calculate the next episode number.
Note: the public channel feed only lists the last 15 videos
"""
import http.client
import json
import xml.etree.ElementTree as ET
import glob
import os
import yt_dlp
# User config values - change these to meet your needs
yt_channel_id = "UC6n8I1UDTKP1IWjQMg6_TwA"
local_archive = r"H:\Seagate\Media\YouTube\B1M"
local_tmp = r"H:\Seagate\Media\Downloads\youtube"
video_format = "bestvideo[vcodec!^=av01][ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]"
min_vid_length = 120
default_start = "s01e01"
# Global variables
next_string = ""
existing_episodes = []
def parse_local_yt_files():
"""
Parse the local YouTube archive directory to find existing episode files.
If no existing episodes are found, set the next episode to the default start.
"""
print("Checking local archive - " + local_archive)
os.chdir(local_archive)
json_files = sorted(glob.glob("*.json"))
# Calculate next episode
if len(json_files) == 0:
print("No existing Episodes found, using default start: " + default_start)
global next_string
next_string = default_start
else:
last_string = json_files[-1].split(" ")
print("Last episode found: " + last_string[0])
update_next_string(last_string[0])
global existing_episodes
for file in json_files:
with open(file, encoding="utf8") as fp:
data = json.load(fp)
existing_episodes.append(data['id'])
def update_next_string(last_string):
"""
Calculate the next episode string based on the last episode in the archive.
Expected format: sXXeYY
Parameters:
last_string (str): The string representing the last episode in the archive.
"""
last_ep = last_string.split("e")
next_ep = int(last_ep[1]) + 1
global next_string
next_string = last_ep[0] + "e" + str(next_ep)
def get_channel_feed():
"""
Get the XML feed of the specified YouTube channel using HTTP GET request.
"""
print("Getting YouTube Feed...")
connection = http.client.HTTPSConnection("www.youtube.com")
try:
connection.request("GET", "/feeds/videos.xml?channel_id=" + yt_channel_id)
except Exception as e:
connection.close()
print(">> Request failed, Unable to get YouTube Feed: {}".format(e))
raise SystemExit(e)
response = connection.getresponse()
if response.status != 200:
connection.close()
print(">> Request failed, Non-200 received getting YouTube Feed: {}".format(response.status))
raise SystemExit(response.status)
data = response.read()
connection.close()
parse_feed(data)
def parse_feed(data):
"""
Parse the XML feed to extract video entries and download any new episodes that meet the length requirement.
Parameters:
data (str): The XML data string to be parsed.
"""
tree = ET.fromstring(data)
entries = tree.findall('{http://www.w3.org/2005/Atom}entry')
ydl_opts = {
'format': video_format,
'outtmpl': f'{local_tmp}/%(title)s[%(id)s].%(ext)s',
'quiet': True,
'addmetadata': True,
'writeinfojson': True,
'writethumbnail': True,
'ignoreerrors': True,
'external_downloader': 'aria2c'
}
for entry in entries:
id = entry.find('{http://www.w3.org/2005/Atom}id').text
id = id.split(':')[-1]
print("--------------------")
print("Checking entry: " + id)
if id not in existing_episodes:
video_url = "https://www.youtube.com/watch?v=" + id
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
video_info = ydl.extract_info(video_url, download=False)
duration = video_info['duration']
if duration > min_vid_length: # check its not a short
print("Downloading: " + id + " - " + video_info['title'])
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
video_info = ydl.extract_info(video_url, download=True)
duration = video_info['duration']
print("Downloading Complete")
# Rename the downloaded files
for file in os.listdir(local_tmp):
if file.startswith(video_info['title']):
print("Moving to: " + local_archive + "/" + next_string + " - " + file)
os.rename(f'{local_tmp}/{file}', f'{local_archive}/{next_string} - {file}')
update_next_string(next_string)
else:
print("Episode too short [min: " + str(min_vid_length) + "s / ep: "
+ str(duration) + "s], skipping: "
+ id + " - " + video_info['title'])
else:
print("Episode already exists, skipping: " + id)
print("--------------------")
print("-- STARTING --")
parse_local_yt_files()
get_channel_feed()
print("-- FINISHED --")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment