Skip to content

Instantly share code, notes, and snippets.

@nucular
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nucular/a21a7b0c05486c21d5a5 to your computer and use it in GitHub Desktop.
Save nucular/a21a7b0c05486c21d5a5 to your computer and use it in GitHub Desktop.
Python Youtube tools
The MIT License (MIT)
Copyright (c) 2014 nucular
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# python3 {{f}}
import urllib.request
import urllib.parse
import json
import time
def search(query, order_by="relevance", max_results=None, start_index=None, key=None, verbose=False):
"""Use the YQL service to search for YouTube videos"""
# build the YQL query
yql = "SELECT * FROM youtube.search WHERE query=\"{}\"".format(query.replace("\"", "\\\""))
if max_results:
yql += " AND max_results={:d}".format(max_results)
if start_index:
yql += " AND start_index={:d}".format(start_index)
if key:
yql += " AND key=\"{}\"".format(query.replace("\"", "\\\""))
if max_results:
yql += " limit {0:d}".format(max_results)
if verbose: print("[videosearch] Query: " + yql)
# build the URL query
q = urllib.parse.urlencode([
("q", yql),
("format", "json"),
("env", "store://datatables.org/alltableswithkeys"),
("diagnostics", "true"),
("debug", "true")
])
tries = 5
wait = 0.1
while tries > 0:
# love me some YQL
res = urllib.request.urlopen("https://query.yahooapis.com/v1/public/yql?" + q)
if res.status != 200:
raise RuntimeError("YQL responded with code {}".format(res.status))
text = res.read().decode()
if verbose: print("[videosearch] Got response")
parsed = json.loads(text)
if parsed["query"]["results"] != None:
return parsed["query"]["results"]["video"]
else:
for i in parsed["query"]["diagnostics"]["url"]:
if "http-status-code" in i and i["http-status-code"] == "403":
# Youtube told us to fuck off
failed = True
if failed:
if verbose: print("[videosearch] Fail on Youtube side, waiting {} seconds".format(wait))
time.sleep(wait)
wait = wait * 2
tries = tries - 1
if verbose: print("[videosearch] Try {} of 5".format(tries))
else:
# Youtube actually gave us no results
return []
raise RuntimeError("Youtube has a bad day")
if __name__ == "__main__":
res = search("charlie bit my finger", verbose=True)
for i in res:
# ugh
print(str((i["id"] + ": " + i["title"]).encode("ascii", "ignore"))[2:-1])
#!/usr/bin/python3
# coding=utf-8
# python3 {{f}}
import re
import http.cookiejar, urllib.request
from urllib.parse import unquote
class VideoInfo(object):
def __init__(self, id):
"""Initiate the instance and set a video ID"""
self.id = id
self._infos = {}
def __iter__(self, *args, **kwargs):
if self.ispopulated():
return self._infos.__iter__(*args, **kwargs)
else:
raise Exception("VideoInfo is not populated yet")
def __dict__(self):
if self.ispopulated():
return self._infos
else:
raise Exception("VideoInfo is not populated yet")
def fetch(self, cookiejar=None, useragent=None):
"""
Fetch the video data for the ID associated with this instance from
YouTube, parse it and populate the instance with the result.
"""
if not cookiejar:
cookiejar = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar))
res = opener.open("https://youtube.com/get_video_info?video_id={}".format(self.id))
if useragent:
opener.addheaders = [("User-agent", useragent)]
if res.status != 200:
raise RuntimeError("YouTube responded with %s" % res.status)
b = res.read()
parsed = self.parse(b.decode("utf-8"))
self.populate(parsed)
return cookiejar
def ispopulated(self):
"""Check if the instance actually contains video info data."""
return self._infos != {}
def populate(self, d):
"""Add a dictionary of video info data to the instance."""
if self.ispopulated():
self.depopulate()
self._infos = d
for k, v in d.items():
setattr(self, k, v)
def depopulate(self):
"""Remove the current video info data from the instance."""
if self.ispopulated():
for k, v in self._infos.items():
delattr(self, v)
self._infos = {}
def parse(self, t):
"""Parse video infos in the query format used by YouTube to a dict."""
parsed = {}
# Kinda like urllib.parse.parse_qs and parse_qsl, but can use
# multiple separators
def parse_qs(t, sep="&"):
parsed = {}
sep = re.escape(sep)
split = filter(None, re.split("[:" + sep + "]+", t))
for i in split:
k, v = i.split("=", 1)
parsed[k] = unquote(v)
return parsed
def parse_qsl(t, sep="&"):
parsed = []
sep = re.escape(sep)
split = filter(None, re.split("[:" + sep + "]+", t))
for i in split:
k, v = i.split("=", 1)
parsed.append((k, unquote(v)))
return parsed
parsed = parse_qs(t)
afmts = parsed["adaptive_fmts"]
afmts = parse_qsl(afmts, "&,")
parsed_afmts = []
current = {}
for i, (k, v) in enumerate(afmts):
if k == "url":
if "url" in current:
parsed_afmts.append(current)
current = {"url": v}
else:
# convert some keys
if k in ["bitrate", "clen", "fps", "itag", "lmt"]:
v = int(v)
elif k == "index" or k == "init":
v = [int(i) for i in v.split("-")]
elif k == "size":
split = v.split("x")
v = {"w": int(split[0]), "h": int(split[1])}
elif k == "type":
split = v.split(";", 1)
if len(split) == 2:
v = (split[0], [i.strip("+") for i in split[1][9:-1].split(",")])
else:
v = tuple(split)
current[k] = v
parsed["adaptive_fmts"] = parsed_afmts
fmtmap = parsed["url_encoded_fmt_stream_map"]
fmtmap = parse_qsl(fmtmap, "&,")
parsed_fmts = []
current = {}
for i, (k, v) in enumerate(fmtmap):
if k == "itag":
if "url" in current:
parsed_fmts.append(current)
current = {"itag": int(v)}
else:
# convert some keys
if k == "type":
split = v.split(";", 1)
if len(split) == 2:
v = (split[0], [i.strip("+") for i in split[1][9:-1].split(",")])
else:
v = tuple(split)
current[k] = v
# I really hate this key name, rename it to "fmts"
del parsed["url_encoded_fmt_stream_map"]
parsed["fmts"] = parsed_fmts
# convert most keys to a more pythonic format (I've estimated these)
types = {
"ad_device": int,
"ad_flags": int,
"ad_logging_flag": int,
"ad_slots": lambda t: [int(i) for i in t.split(",")],
"afv": bool,
"allow_embed": bool,
"allow_html5_ads": bool,
"allow_ratings": bool,
"as_launched_in_country": bool,
"avg_rating": float,
"cc3_module": int,
"cc_asr": int,
"cc_font": lambda t: [i.replace("+", " ") for i in t.split(",")],
"cid": int,
"cl": int,
"cut_ad_for_ypc": bool,
"dash": bool,
"enablecsi": bool,
"fexp": lambda t: [int(i) for i in t.split(",")],
"fmt_list": lambda t: t.split(","),
"focEnabled": bool,
"has_cc": bool,
"idpj": int,
"instream_long": bool,
"iv3_module": int,
"iv_allow_in_place_switch": bool,
"iv_load_policy": int,
"keywords": lambda t: [i.replace("+", " ") for i in t.split(",")],
"ldpj": int,
"length_seconds": int,
"loeid": lambda t: [int(i) for i in t.split(",")],
"logwatch": bool,
"midroll_freqcap": float,
"midroll_prefetch_size": int,
"loudness": float,
"muted": bool,
"no_get_video_log": bool,
"rmktEnabled": bool,
"sentiment": int,
"sffb": bool,
"show_content_thumbnail": bool,
"subscribed": int,
"tag_for_child_directed": bool,
"timestamp": int,
"title": lambda t: t.replace("+", " "),
"tmi": int,
"use_cipher_signature": bool,
"video_verticals": lambda t: [int(i) for i in t[1:-1].split(",")],
"view_count": int,
"watermark": lambda t: [i for i in filter(len, t.split(","))],
"ytfocEnabled": bool
}
for k,v in types.items():
if k in parsed:
parsed[k] = v(parsed[k])
return parsed
if __name__ == "__main__":
import pprint
vi = VideoInfo("_OBlgSz8sSM")
vi.fetch()
pprint.pprint(vi._infos)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment