Skip to content

Instantly share code, notes, and snippets.

@aabiji
Created April 7, 2024 15:39
Show Gist options
  • Save aabiji/c65254af440b1bb53149b2d6a9fafeca to your computer and use it in GitHub Desktop.
Save aabiji/c65254af440b1bb53149b2d6a9fafeca to your computer and use it in GitHub Desktop.
Youtube video downloader
"""
A tiny youtube video downloader.
usage: yt.py [-h] [--only-audio] [--num_retries NUM_RETRIES] [--output OUTPUT] video_url
Built by Abigail Adegbiji on April 7, 2024.
Inspried by this blog post: https://tyrrrz.me/blog/reverse-engineering-youtube-revisited
Note that the get_video_info function will be subject to change when Youtube changes their api.
Also note that ffmpeg and ffmpeg-python are required dependencies.
"""
import argparse
import ffmpeg
import os
import requests
import signal
import sys
import time
def format_size(size):
units = ["B", "KB", "MB", "GB", "TB"]
index = 0
while size > 1000:
size /= 1000
index += 1
return f"{round(size, 2)} {units[index]}"
def print_download_progress(bar_length, downloaded, total):
if downloaded != 0 and total != 0:
percentage = downloaded / total
else:
percentage = 0
amount = percentage * bar_length
progress = ("=" * int(amount)).ljust(bar_length - 1)
sys.stdout.write("\033[K")
message = f"[{progress}] {format_size(downloaded)}/{format_size(total)}"
print(message, end="\r")
def download_byte_range(filename, url):
chunk_size = 1024 * 100
total = int(requests.head(url).headers["Content-Length"])
amount_read = 0
try:
# Continue from where we stopped
if os.path.exists(filename):
filesize = os.path.getsize(filename)
if filesize >= total:
print("Already downloaded")
return True
headers = {"Range": f"bytes={filesize}-"}
amount_read = filesize
else:
headers = {}
with requests.get(url, headers=headers, stream=True) as req:
req.raise_for_status()
mode = "wb" if headers == {} else "ab"
with open(filename, mode) as file:
for chunk in req.iter_content(chunk_size=chunk_size):
file.write(chunk)
amount_read += sys.getsizeof(chunk)
print_download_progress(30, amount_read, total)
return True
except requests.exceptions.RequestException as e:
print(f"Download error: {e}")
return False
def download_bytes(filename, url, max_retries):
retries = 0
while retries < max_retries:
success = download_byte_range(filename, url)
print()
if not success:
print(f"Retrying {retries}/{max_retries}")
retries += 1
else:
break
time.sleep(0.25)
if retries == max_retries:
print("Max retries exceeded. Stopping")
exit()
def get_video_info(video_id):
key = "AIzaSyA8eiZmM1FaDVjRy-df2KTyQ_vz_yYM39w"
endpoint = f"https://www.youtube.com/youtubei/v1/player?key={key}"
payload = {
"videoId": video_id,
"context": {
"client": {
"clientName": "ANDROID_TESTSUITE",
"clientVersion": "1.9",
"androidSdkVersion": 30,
"hl": "en",
"gl": "US",
"utcOffsetMinutes": 0
}
}
}
headers = {
"User-Agent": "com.google.android.youtube/17.36.4 (Linux; U; Android 12; GB) gzip"
}
response = requests.post(endpoint, json=payload, headers=headers)
if response.status_code != 200:
print("Error getting video info.")
exit()
return response.json()
def get_best_stream(streams, searchingAudio):
filtered = []
for stream in streams:
mimetype = stream["mimeType"]
if searchingAudio and "audio" in mimetype:
filtered.append(stream)
elif not searchingAudio and "video" in mimetype:
filtered.append(stream)
# The higher the bitrate the better the stream quality
def bitrates(s): return int(s["averageBitrate"])
return max(filtered, key=bitrates)
def download_stream(streams, title, is_audio, max_retries):
stream = get_best_stream(streams, is_audio)
container = stream["mimeType"].split(";")[0][6:]
stream_type = "audio" if is_audio else "video"
url = stream["url"]
file = f"{title} -- {stream_type}.{container}"
download_bytes(file, url, max_retries)
return file
def merge_streams(video_file, audio_file, output_file, only_audio):
if not only_audio:
vstream = ffmpeg.input(video_file)
astream = ffmpeg.input(audio_file)
stream = ffmpeg.output(vstream, astream, output_file, loglevel="quiet")
ffmpeg.run(stream, overwrite_output=True)
os.remove(video_file)
os.remove(audio_file)
else:
astream = ffmpeg.input(audio_file)
stream = ffmpeg.output(astream, output_file, loglevel="quiet")
ffmpeg.run(stream, overwrite_output=True)
os.remove(audio_file)
def download_video(video_id, only_audio, max_retries, output_file):
info = get_video_info(video_id)
title = info["videoDetails"]["title"]
print(f"Downloading '{title}'")
video_file = ""
audio_file = ""
streams = info["streamingData"]["adaptiveFormats"]
if output_file is None:
output_file = f"{title}.{'mp3' if only_audio else 'mp4'}"
if not only_audio:
print("Downloading video ...")
video_file = download_stream(streams, title, False, max_retries)
print("Downloading audio ...")
audio_file = download_stream(streams, title, True, max_retries)
merge_streams(video_file, audio_file, output_file, only_audio)
def signal_handler(signal, frame):
print("Exiting ...")
exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
parser = argparse.ArgumentParser(description="Youtube video downloader")
parser.add_argument("video_url", type=str, help="Url to the youtube video")
parser.add_argument("--only-audio", action="store_true",
help="Only download audio")
parser.add_argument("--num_retries", type=int,
help="Set the number of retries.")
parser.add_argument("--output", type=str, help="The file to download to")
args = parser.parse_args()
if args.num_retries is None:
args.num_retries = 3
id_index = args.video_url.find("v=")
if id_index == -1:
print("Please supply a valid youtube video url.")
exit()
video_id = args.video_url[id_index + 2: id_index + 13]
download_video(video_id, args.only_audio, args.num_retries, args.output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment