Skip to content

Instantly share code, notes, and snippets.

@rebane2001
Created July 17, 2021 19:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rebane2001/a8fdc9c25359a26b9b17a3123e293447 to your computer and use it in GitHub Desktop.
Save rebane2001/a8fdc9c25359a26b9b17a3123e293447 to your computer and use it in GitHub Desktop.
Somewhat hastily written script to pull metadata for every video in an archive (recommended to cronjob to every day)
import requests
import re
import os
import time
import json
from datetime import datetime, timezone
import random
# Put your YouTube API keys here, the more the merrier (one key can do 500k vids a day)
youtube_api_keys = ["YOUR_API_KEYS_HERE"]
# Get todays date as string
datestr = datetime.today().strftime('%Y-%m-%d')
# Output paths
jsonlpath = f"daily_data/{datestr}/{datestr}.jsonl"
logpath = f"daily_data/{datestr}/{datestr}.log"
# Links
video_id_sources = ["/path/to/links.txt"]
error_retries = 10
def extractVids(link):
matches = []
vidmatch = re.findall(r'/watch\?v=([A-Za-z0-9_\-]{11})', link)
if vidmatch:
matches.extend(vidmatch)
vidmatch = re.findall(r'&v=([A-Za-z0-9_\-]{11})', link)
if vidmatch:
matches.extend(vidmatch)
vidmatch = re.findall(r'youtu.be/([A-Za-z0-9_\-]{11})', link)
if vidmatch:
matches.extend(vidmatch)
vidmatch = re.findall(r'/shorts/([A-Za-z0-9_\-]{11})', link)
if vidmatch:
matches.extend(vidmatch)
vidmatch = re.findall(r'/embed/([A-Za-z0-9_\-]{11})', link)
if vidmatch:
matches.extend(vidmatch)
# archive.txt format
vidmatch = re.findall(r'youtube ([A-Za-z0-9_\-]{11})', link)
if vidmatch:
matches.extend(vidmatch)
return matches
def extractPlaylists(link):
matches = []
vidmatch = re.findall(r'/playlist\?list=([A-Za-z0-9_\-]{16,64})', link)
if vidmatch:
matches.extend(vidmatch)
vidmatch = re.findall(r'/playlist\?.*?&list=([A-Za-z0-9_\-]{16,64})', link)
if vidmatch:
matches.extend(vidmatch)
return matches
# Some snippets borrowed from https://github.com/itallreturnstonothing/panicpony/
def get_playlists_page(playlist_id, page_token=None):
response = requests.get(
(
f'https://www.googleapis.com/youtube/v3/playlistItems?'
f'playlistId={playlist_id}'
f'&part=status,snippet,contentDetails'
f'&maxResults=50'
f'{"&pageToken=" + page_token if page_token else ""}'
f'&key={random.choice(youtube_api_keys)}'
)
)
if not response.status_code == 200:
print("Something not right!")
print(playlist_id)
return (None, None)
precious_data = json.loads(response.text)
return (
precious_data["items"],
precious_data["nextPageToken"] if "nextPageToken" in precious_data else None
)
def get_videos_page(video_ids):
response = requests.get(
(
f'https://www.googleapis.com/youtube/v3/videos?'
f'id={",".join(video_ids)}'
f'&part=contentDetails,id,liveStreamingDetails,localizations,player,recordingDetails,snippet,statistics,status,topicDetails'
f'&maxResults=50'
f'&key={random.choice(youtube_api_keys)}'
)
)
if not response.status_code == 200:
print("Something not right!")
return None
precious_data = json.loads(response.text)
return precious_data["items"]
pl_page = 0
def get_all_videos_from_playlist(playlist_id):
global pl_page
pl_page = 1
writeLog(f"Fetching playlist {playlist_id}")
(first_videos, next_page) = get_playlists_page(playlist_id)
def amazing(next_page):
global pl_page
while next_page:
pl_page+=1
writeLog(f"Fetching playlist (page {pl_page}, {pl_page*50} videos)")
next_videos, next_page = get_playlists_page(playlist_id,next_page)
yield next_videos
return [x for flatten_list in [first_videos] + list(amazing(next_page)) for x in flatten_list]
def getVideoIds():
videoids = set()
for filename in video_id_sources:
print(filename)
with open(filename, "r") as f:
text = f.read()
for playlist in extractPlaylists(text):
for i in range(error_retries):
try:
videos = get_all_videos_from_playlist(playlist)
for vid in videos:
videoids.add(vid["snippet"]["resourceId"]["videoId"])
break
except Exception as e:
if i < error_retries-1:
print(repr(e))
print(f"Retry attempt {i+2} of {error_retries}...")
else:
writeLog(f"Playlist {playlist} failed with error {repr(e)}")
videoids = videoids[50:]
for videoid in extractVids(text):
videoids.add(videoid)
return videoids
# Write timestamped log to file and console
def writeLog(message):
with open(logpath, "a") as f:
msg = f"[{str(datetime.utcnow().replace(microsecond=0).isoformat())}] {message}"
f.write(f"{msg}\n")
print(msg)
# Main stuff
def downloadMetadata(videoids):
orig_len = len(videoids)
while len(videoids) > 0:
print(f"{round(((orig_len-len(videoids))/orig_len)*1000)/10}% ({orig_len-len(videoids)}/{orig_len})")
for i in range(error_retries):
try:
with open(jsonlpath, "a") as f:
for video in get_videos_page(videoids[:50]):
f.write(f"{json.dumps(video)}\n")
videoids = videoids[50:]
break
except Exception as e:
if i < error_retries-1:
print(repr(e))
print(f"Retry attempt {i+2} of {error_retries}...")
else:
writeLog(f"Batch {videoids[:50]} failed with error {repr(e)}")
videoids = videoids[50:]
print(f"100% ({orig_len}/{orig_len})")
def main():
if os.path.isdir(f"daily_data/{datestr}"):
print("Path already exists :(")
return
os.mkdir(f"daily_data/{datestr}")
writeLog(f"Good morning Ponyville - {datestr}")
writeLog(f"Getting video IDs...")
videoids = getVideoIds()
writeLog(f"Got {len(videoids)} IDs!")
if len(videoids) > 450000 * len(youtube_api_keys):
print(f"Warning! Over {450000 * len(youtube_api_keys)} videos, we may run out of API requests.")
print("Press any key to continue anyways...")
input()
writeLog(f"Sorting...")
videoids = sorted(videoids)
writeLog(f"Downloading...")
downloadMetadata(videoids)
writeLog(f"Done for today!")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment