Created
January 9, 2019 09:36
-
-
Save christopher-dG/0b03cfcdb00602e6edf66c3341a18ba2 to your computer and use it in GitHub Desktop.
Downloads YouTube channel metadata (video data, comments, playlists)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Dependencies: | |
# - Python >= 3.6 | |
# - google-api-python-client | |
# - requests | |
from argparse import ArgumentParser | |
from base64 import b64encode | |
from datetime import date, timedelta | |
from googleapiclient.discovery import build | |
from json import dump | |
from os import getenv, mkdir | |
from os.path import exists, join | |
from requests import get | |
def get_videos_playlists(client, channel, months, start_offset_days=0): | |
""" | |
Generate videos and playlists by a channel. | |
Each video looks like this: | |
{ | |
id: str, | |
title: str, | |
description: str, | |
tags: [str], | |
thumbnail: str, # Base64-encoded thumbnail. | |
published: str, | |
views: int, | |
likes: int, | |
dislikes: int, | |
comments: [{ | |
id: str, | |
author: { | |
name: str, | |
id: str, | |
}, | |
content: str, | |
likes: int, | |
published: str, | |
updated: str, | |
replies: [comment], # These ones won't have a replies key. | |
}], | |
} | |
Each playlist looks like this: | |
{ | |
id: str, | |
title: str, | |
description: str, | |
tags: [str], | |
thumbnail: str, # Base64-encoded thumbnail. | |
published: str, | |
videos: [str], # Video IDs. | |
} | |
""" | |
search = client.search() | |
videos = client.videos() | |
threads = client.commentThreads() | |
playlists = client.playlistItems() | |
days = months * 31 - start_offset_days | |
step = 7 | |
# YouTube search returns at most 500 results so we have to use this date trickery. | |
for day_offset in range(start_offset_days, days, step): | |
token = None | |
before = date.today() - timedelta(days=day_offset) | |
after = before - timedelta(days=step) | |
with open("offsets.txt", "a") as f: | |
f.write(str(day_offset) + "\n") | |
while True: | |
resp = do_request(search.list( | |
channelId=channel, | |
part="snippet", | |
type="video,playlist", | |
maxResults=50, | |
publishedBefore=before.isoformat() + "T00:00:00.00Z", | |
publishedAfter=after.isoformat() + "T00:00:00.00Z", | |
pageToken=token, | |
)) | |
for item in resp.get("items", []): | |
kind = item.get("id", {}).get("kind") | |
if kind == "youtube#video": | |
yield "video", format_video(client, item) | |
elif kind == "youtube#playlist": | |
yield "playlist", format_playlist(client, item) | |
token = resp.get("nextPageToken") | |
if not token: | |
break | |
def format_video(client, item): | |
"""Format a single video.""" | |
video = do_request(client.videos().list( | |
id=item["id"]["videoId"], | |
part="snippet,statistics", | |
))["items"][0] | |
snip = video["snippet"] | |
stats = video["statistics"] | |
return { | |
"id": video["id"], | |
"title": snip.get("title", ""), | |
"description": snip.get("description", ""), | |
"tags": snip.get("tags", []), | |
"thumbnail": get_thumbnail(snip), | |
"published": snip.get("publishedAt", ""), | |
"views": int(stats.get("viewCount", 0)), | |
"likes": int(stats.get("likeCount", 0)), | |
"dislikes": int(stats.get("dislikeCount", 0)), | |
"comments": get_comments(client, video["id"]), | |
} | |
def get_comments(client, video): | |
"""Get a list of all comments on a video.""" | |
threads = client.commentThreads() | |
token = None | |
comments = [] | |
while True: | |
resp = do_request(threads.list( | |
videoId=video, | |
textFormat="plainText", | |
part="snippet,replies", | |
maxResults=100, | |
pageToken=token, | |
)) | |
comments.extend(format_thread(t) for t in resp.get("items", [])) | |
token = resp.get("nextPageToken") | |
if not token: | |
return comments | |
def format_thread(thread): | |
"""Format a single thread.""" | |
data = format_comment(thread["snippet"]["topLevelComment"]) | |
replies = thread.get("replies", {}).get("comments", []) | |
data["replies"] = [format_comment(c) for c in replies] | |
return data | |
def format_comment(comment): | |
"""Format a single comment.""" | |
snip = comment["snippet"] | |
return { | |
"id": comment["id"], | |
"author": { | |
"name": snip.get("authorDisplayName", ""), | |
"id": snip.get("authorChannelId", {}).get("value", ""), | |
}, | |
"content": snip.get("textOriginal", snip.get("textDisplay", "")), | |
"likes": int(snip.get("likeCount", 0)), | |
"published": snip.get("publishedAt", ""), | |
"updated": snip.get("updatedAt", ""), | |
} | |
def format_playlist(client, item): | |
"""Format a single playlist.""" | |
playlist = do_request(client.playlists().list( | |
id=item["id"]["playlistId"], | |
part="snippet", | |
))["items"][0] | |
snip = playlist["snippet"] | |
return { | |
"id": playlist["id"], | |
"title": snip.get("title", ""), | |
"description": snip.get("description", ""), | |
"tags": snip.get("tags", []), | |
"thumbnail": get_thumbnail(snip), | |
"published": snip.get("publishedAt", ""), | |
"videos": get_playlist_items(client, playlist["id"]), | |
} | |
def get_playlist_items(client, playlist): | |
"""Get a list of all videos in a playlist.""" | |
playlist_items = client.playlistItems() | |
token = None | |
videos = [] | |
while True: | |
resp = do_request(playlist_items.list( | |
playlistId=playlist, | |
part="snippet", | |
maxResults=50, | |
pageToken=token, | |
)) | |
ids = [v["snippet"].get("resourceId", {}).get("videoId") for v in resp["items"]] | |
videos.extend(filter(bool, ids)) | |
token = resp.get("nextPageToken") | |
if not token: | |
return videos | |
def get_thumbnail(snip): | |
"""Download and encode a thumbnail.""" | |
if "thumbnails" not in snip: | |
return "" | |
for res in ["maxres", "standard", "high", "medium", "default"]: | |
if res in snip["thumbnails"]: | |
resp = get(snip["thumbnails"][res]["url"]) | |
if resp.status_code == 200: | |
return str(b64encode(resp.content), "utf-8") | |
return "" | |
def do_request(req): | |
"""Make a request to the YouTube API.""" | |
# TODO: Error handling. | |
return req.execute() | |
if __name__ == "__main__": | |
parser = ArgumentParser(description=""" | |
This script will download data from a YouTube channel | |
and save it to a bunch of JSON files. | |
See the get_videos_playlists docstring for more details. | |
""") | |
parser.add_argument("channel", help="Channel ID") | |
parser.add_argument("key", help="API key") | |
parser.add_argument("-m", "--months", help="Months to search", type=int, default=36) | |
parser.add_argument("-o", "--offset", help="Start offset in days", type=int, default=0) | |
args = parser.parse_args() | |
client = build("youtube", "v3", developerKey=args.key) | |
exists("playlists") or mkdir("playlists") | |
exists("videos") or mkdir("videos") | |
gen = get_videos_playlists( | |
client, args.channel, args.months, | |
start_offset_days=args.offset, | |
) | |
for t, item in gen: | |
if t == "video": | |
dir = "videos" | |
elif t == "playlist": | |
dir = "playlists" | |
else: | |
continue | |
with open(join(dir, item["id"] + ".json"), "w") as f: | |
dump(item, f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment