Skip to content

Instantly share code, notes, and snippets.

@livewing
Created December 7, 2018 12:22
Show Gist options
  • Save livewing/c7c90d0c16a3e7a30fa5c27cbd7ac5e9 to your computer and use it in GitHub Desktop.
Save livewing/c7c90d0c16a3e7a30fa5c27cbd7ac5e9 to your computer and use it in GitHub Desktop.
mitmproxy script for iNico
import html
import json
from re import match, search, sub
import threading
from time import mktime, sleep, strptime
from urllib import parse, request
from pyquery import PyQuery as pq
PAT_WATCH_PATH = r"^/watch/(.*?)\?watch_harmful=1$"
PAT_GETFLV_PATH = r"^/api/getflv\?v=(.*)"
PAT_WATCH_DATA = r'<div id="js-initial-watch-data" data-api-data="(.*?)"'
PAT_GETFLV_SUB = r"(?<=&url=)(.*?)(?=&ms=)"
smile_urls = {}
titles = {}
watch_data = {}
def response(flow):
scheme = flow.request.scheme
host = flow.request.host_header
path = flow.request.path
if scheme == "https" and host == "www.nicovideo.jp":
m = match(PAT_WATCH_PATH, path)
if m:
__watch_page(flow, m.group(1))
return
if scheme == "http" and host == "flapi.nicovideo.jp":
m = match(PAT_GETFLV_PATH, path)
if m:
__getflv_dmc(flow, m.group(1)) or __getflv_smile(flow, m.group(1))
return
if scheme == "http" and host == "www.nicovideo.jp" and path == "/api/videoviewhistory/list":
__history(flow)
return
def __watch_page(flow, video_id):
m = html.unescape(search(PAT_WATCH_DATA, flow.response.text).group(1))
j = json.loads(m)
url = j["video"]["smileInfo"]["url"]
title = j["video"]["originalTitle"]
titles[video_id] = title
smile_urls[video_id] = url
if j["video"]["dmcInfo"] is not None:
j["video"]["dmcInfo"]["session_api"]["token_parsed"] = json.loads(j["video"]["dmcInfo"]["session_api"]["token"])
watch_data[video_id] = j
def __getflv_smile(flow, video_id):
if not video_id in smile_urls:
return False
encoded = parse.quote(smile_urls[video_id])
flow.response.text = sub(PAT_GETFLV_SUB, encoded, flow.response.text)
print(f" -> {titles[video_id]} served from smile")
return True
def __getflv_dmc(flow, video_id):
def heartbeat(j, jr):
print(f' -> Heartbeat loop for {j["video"]["originalTitle"]} has been started')
limit = 2 * 60
for i in range(limit):
sleep(60)
req = request.Request(f'{j["video"]["dmcInfo"]["session_api"]["urls"][0]["url"]}/{jr["data"]["session"]["id"]}?_format=json&_method=PUT', data=json.dumps(jr["data"]).encode("utf-8"), method="POST")
req.add_header("Content-Type", "application/json")
with request.urlopen(req):
print(f' -> Heartbeat {i + 1} of {limit} for {j["video"]["originalTitle"]}')
if not video_id in watch_data:
return False
j = watch_data[video_id]
reqd = {
"session": {
"client_info": {
"player_id": j["video"]["dmcInfo"]["session_api"]["player_id"]
},
"content_auth": {
"auth_type": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["protocols"][0]["auth_type"],
"content_key_timeout": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["content_key_timeout"],
"service_id": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["service_id"],
"service_user_id": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["service_user_id"]
},
"content_id": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["content_ids"][0],
"content_src_id_sets": [
{
"content_src_ids": [
{
"src_id_to_mux": {
"audio_src_ids": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["audios"],
"video_src_ids": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["videos"]
}
}
]
}
],
"content_type": "movie",
"content_uri": "",
"keep_method": {
"heartbeat": {
"lifetime": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["heartbeat_lifetime"]
}
},
"priority": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["priority"],
"protocol": {
"name": "http",
"parameters": {
"http_parameters": {
"parameters": {
"http_output_download_parameters": {
"transfer_preset": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["transfer_presets"][0],
"use_ssl": "yes",
"use_well_known_port": "no"
}
}
}
}
},
"recipe_id": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["recipe_id"],
"session_operation_auth": {
"session_operation_auth_by_signature": {
"signature": j["video"]["dmcInfo"]["session_api"]["signature"],
"token": j["video"]["dmcInfo"]["session_api"]["token"]
}
},
"timing_constraint": "unlimited"
}
}
req = request.Request(f'{j["video"]["dmcInfo"]["session_api"]["urls"][0]["url"]}?_format=json', data=json.dumps(reqd).encode("utf-8"), method="POST")
req.add_header("Content-Type", "application/json")
with request.urlopen(req) as res:
jr = json.loads(res.read().decode(res.headers.get_content_charset() or "utf-8"))
encoded = parse.quote(jr["data"]["session"]["content_uri"])
flow.response.text = sub(PAT_GETFLV_SUB, encoded, flow.response.text)
print(f" -> {titles[video_id]} served from dmc")
threading.Thread(target=heartbeat, args=[j, jr]).start()
return True
def __history(flow):
def __element_to_history(_, element):
q = pq(element)
video_id = q("h5>a").attr("href").replace("watch/", "")
deleted = "deleted" in q("img.video").attr("data-original")
thumbnail = q("img.video").attr("data-original") if not deleted else "http://nicovideo.cdn.nimg.jp/web/img/common/video_deleted_ja-jp.jpg"
title = q("h5>a").text()
length = q(".videoTime").text() if not deleted else "0:00"
watch_date = int(mktime(strptime(q("p.posttime").html().replace(q("p.posttime>span").outer_html(), ""), "%Y年%m月%d日 %H:%M 視聴")))
watch_count = int(search(r"\d+", q(".posttime>span").text()).group())
return {
"item_id": video_id,
"video_id": video_id,
"deleted": 1 if deleted else 0,
"thumbnail_url": thumbnail,
"title": title,
"length": length,
"watch_date": watch_date,
"watch_count": watch_count,
"device": 0,
"playback_position": 0
}
req = request.Request("https://www.nicovideo.jp/my/history")
req.add_header("Cookie", flow.request.headers["Cookie"])
with request.urlopen(req) as res:
h = res.read().decode(res.headers.get_content_charset())
q = pq(h, parser="html")
entries = q('div[id^="outer_"]').map(__element_to_history)
j = json.loads(flow.response.text)
j["history"] = entries
flow.response.text = json.dumps(j)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment