Skip to content

Instantly share code, notes, and snippets.

@christopher-dG
Created January 9, 2019 09:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save christopher-dG/0b03cfcdb00602e6edf66c3341a18ba2 to your computer and use it in GitHub Desktop.
Save christopher-dG/0b03cfcdb00602e6edf66c3341a18ba2 to your computer and use it in GitHub Desktop.
Downloads YouTube channel metadata (video data, comments, playlists)
#!/usr/bin/env python3
# Dependencies:
# - Python >= 3.6
# - google-api-python-client
# - requests
from argparse import ArgumentParser
from base64 import b64encode
from datetime import date, timedelta
from googleapiclient.discovery import build
from json import dump
from os import getenv, mkdir
from os.path import exists, join
from requests import get
def get_videos_playlists(client, channel, months, start_offset_days=0):
"""
Generate videos and playlists by a channel.
Each video looks like this:
{
id: str,
title: str,
description: str,
tags: [str],
thumbnail: str, # Base64-encoded thumbnail.
published: str,
views: int,
likes: int,
dislikes: int,
comments: [{
id: str,
author: {
name: str,
id: str,
},
content: str,
likes: int,
published: str,
updated: str,
replies: [comment], # These ones won't have a replies key.
}],
}
Each playlist looks like this:
{
id: str,
title: str,
description: str,
tags: [str],
thumbnail: str, # Base64-encoded thumbnail.
published: str,
videos: [str], # Video IDs.
}
"""
search = client.search()
videos = client.videos()
threads = client.commentThreads()
playlists = client.playlistItems()
days = months * 31 - start_offset_days
step = 7
# YouTube search returns at most 500 results so we have to use this date trickery.
for day_offset in range(start_offset_days, days, step):
token = None
before = date.today() - timedelta(days=day_offset)
after = before - timedelta(days=step)
with open("offsets.txt", "a") as f:
f.write(str(day_offset) + "\n")
while True:
resp = do_request(search.list(
channelId=channel,
part="snippet",
type="video,playlist",
maxResults=50,
publishedBefore=before.isoformat() + "T00:00:00.00Z",
publishedAfter=after.isoformat() + "T00:00:00.00Z",
pageToken=token,
))
for item in resp.get("items", []):
kind = item.get("id", {}).get("kind")
if kind == "youtube#video":
yield "video", format_video(client, item)
elif kind == "youtube#playlist":
yield "playlist", format_playlist(client, item)
token = resp.get("nextPageToken")
if not token:
break
def format_video(client, item):
"""Format a single video."""
video = do_request(client.videos().list(
id=item["id"]["videoId"],
part="snippet,statistics",
))["items"][0]
snip = video["snippet"]
stats = video["statistics"]
return {
"id": video["id"],
"title": snip.get("title", ""),
"description": snip.get("description", ""),
"tags": snip.get("tags", []),
"thumbnail": get_thumbnail(snip),
"published": snip.get("publishedAt", ""),
"views": int(stats.get("viewCount", 0)),
"likes": int(stats.get("likeCount", 0)),
"dislikes": int(stats.get("dislikeCount", 0)),
"comments": get_comments(client, video["id"]),
}
def get_comments(client, video):
"""Get a list of all comments on a video."""
threads = client.commentThreads()
token = None
comments = []
while True:
resp = do_request(threads.list(
videoId=video,
textFormat="plainText",
part="snippet,replies",
maxResults=100,
pageToken=token,
))
comments.extend(format_thread(t) for t in resp.get("items", []))
token = resp.get("nextPageToken")
if not token:
return comments
def format_thread(thread):
"""Format a single thread."""
data = format_comment(thread["snippet"]["topLevelComment"])
replies = thread.get("replies", {}).get("comments", [])
data["replies"] = [format_comment(c) for c in replies]
return data
def format_comment(comment):
"""Format a single comment."""
snip = comment["snippet"]
return {
"id": comment["id"],
"author": {
"name": snip.get("authorDisplayName", ""),
"id": snip.get("authorChannelId", {}).get("value", ""),
},
"content": snip.get("textOriginal", snip.get("textDisplay", "")),
"likes": int(snip.get("likeCount", 0)),
"published": snip.get("publishedAt", ""),
"updated": snip.get("updatedAt", ""),
}
def format_playlist(client, item):
"""Format a single playlist."""
playlist = do_request(client.playlists().list(
id=item["id"]["playlistId"],
part="snippet",
))["items"][0]
snip = playlist["snippet"]
return {
"id": playlist["id"],
"title": snip.get("title", ""),
"description": snip.get("description", ""),
"tags": snip.get("tags", []),
"thumbnail": get_thumbnail(snip),
"published": snip.get("publishedAt", ""),
"videos": get_playlist_items(client, playlist["id"]),
}
def get_playlist_items(client, playlist):
"""Get a list of all videos in a playlist."""
playlist_items = client.playlistItems()
token = None
videos = []
while True:
resp = do_request(playlist_items.list(
playlistId=playlist,
part="snippet",
maxResults=50,
pageToken=token,
))
ids = [v["snippet"].get("resourceId", {}).get("videoId") for v in resp["items"]]
videos.extend(filter(bool, ids))
token = resp.get("nextPageToken")
if not token:
return videos
def get_thumbnail(snip):
"""Download and encode a thumbnail."""
if "thumbnails" not in snip:
return ""
for res in ["maxres", "standard", "high", "medium", "default"]:
if res in snip["thumbnails"]:
resp = get(snip["thumbnails"][res]["url"])
if resp.status_code == 200:
return str(b64encode(resp.content), "utf-8")
return ""
def do_request(req):
"""Make a request to the YouTube API."""
# TODO: Error handling.
return req.execute()
if __name__ == "__main__":
parser = ArgumentParser(description="""
This script will download data from a YouTube channel
and save it to a bunch of JSON files.
See the get_videos_playlists docstring for more details.
""")
parser.add_argument("channel", help="Channel ID")
parser.add_argument("key", help="API key")
parser.add_argument("-m", "--months", help="Months to search", type=int, default=36)
parser.add_argument("-o", "--offset", help="Start offset in days", type=int, default=0)
args = parser.parse_args()
client = build("youtube", "v3", developerKey=args.key)
exists("playlists") or mkdir("playlists")
exists("videos") or mkdir("videos")
gen = get_videos_playlists(
client, args.channel, args.months,
start_offset_days=args.offset,
)
for t, item in gen:
if t == "video":
dir = "videos"
elif t == "playlist":
dir = "playlists"
else:
continue
with open(join(dir, item["id"] + ".json"), "w") as f:
dump(item, f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment