-
-
Save livewing/c7c90d0c16a3e7a30fa5c27cbd7ac5e9 to your computer and use it in GitHub Desktop.
mitmproxy script for iNico
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import html | |
import json | |
from re import match, search, sub | |
import threading | |
from time import mktime, sleep, strptime | |
from urllib import parse, request | |
from pyquery import PyQuery as pq | |
PAT_WATCH_PATH = r"^/watch/(.*?)\?watch_harmful=1$" | |
PAT_GETFLV_PATH = r"^/api/getflv\?v=(.*)" | |
PAT_WATCH_DATA = r'<div id="js-initial-watch-data" data-api-data="(.*?)"' | |
PAT_GETFLV_SUB = r"(?<=&url=)(.*?)(?=&ms=)" | |
smile_urls = {} | |
titles = {} | |
watch_data = {} | |
def response(flow): | |
scheme = flow.request.scheme | |
host = flow.request.host_header | |
path = flow.request.path | |
if scheme == "https" and host == "www.nicovideo.jp": | |
m = match(PAT_WATCH_PATH, path) | |
if m: | |
__watch_page(flow, m.group(1)) | |
return | |
if scheme == "http" and host == "flapi.nicovideo.jp": | |
m = match(PAT_GETFLV_PATH, path) | |
if m: | |
__getflv_dmc(flow, m.group(1)) or __getflv_smile(flow, m.group(1)) | |
return | |
if scheme == "http" and host == "www.nicovideo.jp" and path == "/api/videoviewhistory/list": | |
__history(flow) | |
return | |
def __watch_page(flow, video_id): | |
m = html.unescape(search(PAT_WATCH_DATA, flow.response.text).group(1)) | |
j = json.loads(m) | |
url = j["video"]["smileInfo"]["url"] | |
title = j["video"]["originalTitle"] | |
titles[video_id] = title | |
smile_urls[video_id] = url | |
if j["video"]["dmcInfo"] is not None: | |
j["video"]["dmcInfo"]["session_api"]["token_parsed"] = json.loads(j["video"]["dmcInfo"]["session_api"]["token"]) | |
watch_data[video_id] = j | |
def __getflv_smile(flow, video_id): | |
if not video_id in smile_urls: | |
return False | |
encoded = parse.quote(smile_urls[video_id]) | |
flow.response.text = sub(PAT_GETFLV_SUB, encoded, flow.response.text) | |
print(f" -> {titles[video_id]} served from smile") | |
return True | |
def __getflv_dmc(flow, video_id): | |
def heartbeat(j, jr): | |
print(f' -> Heartbeat loop for {j["video"]["originalTitle"]} has been started') | |
limit = 2 * 60 | |
for i in range(limit): | |
sleep(60) | |
req = request.Request(f'{j["video"]["dmcInfo"]["session_api"]["urls"][0]["url"]}/{jr["data"]["session"]["id"]}?_format=json&_method=PUT', data=json.dumps(jr["data"]).encode("utf-8"), method="POST") | |
req.add_header("Content-Type", "application/json") | |
with request.urlopen(req): | |
print(f' -> Heartbeat {i + 1} of {limit} for {j["video"]["originalTitle"]}') | |
if not video_id in watch_data: | |
return False | |
j = watch_data[video_id] | |
reqd = { | |
"session": { | |
"client_info": { | |
"player_id": j["video"]["dmcInfo"]["session_api"]["player_id"] | |
}, | |
"content_auth": { | |
"auth_type": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["protocols"][0]["auth_type"], | |
"content_key_timeout": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["content_key_timeout"], | |
"service_id": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["service_id"], | |
"service_user_id": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["service_user_id"] | |
}, | |
"content_id": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["content_ids"][0], | |
"content_src_id_sets": [ | |
{ | |
"content_src_ids": [ | |
{ | |
"src_id_to_mux": { | |
"audio_src_ids": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["audios"], | |
"video_src_ids": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["videos"] | |
} | |
} | |
] | |
} | |
], | |
"content_type": "movie", | |
"content_uri": "", | |
"keep_method": { | |
"heartbeat": { | |
"lifetime": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["heartbeat_lifetime"] | |
} | |
}, | |
"priority": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["priority"], | |
"protocol": { | |
"name": "http", | |
"parameters": { | |
"http_parameters": { | |
"parameters": { | |
"http_output_download_parameters": { | |
"transfer_preset": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["transfer_presets"][0], | |
"use_ssl": "yes", | |
"use_well_known_port": "no" | |
} | |
} | |
} | |
} | |
}, | |
"recipe_id": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["recipe_id"], | |
"session_operation_auth": { | |
"session_operation_auth_by_signature": { | |
"signature": j["video"]["dmcInfo"]["session_api"]["signature"], | |
"token": j["video"]["dmcInfo"]["session_api"]["token"] | |
} | |
}, | |
"timing_constraint": "unlimited" | |
} | |
} | |
req = request.Request(f'{j["video"]["dmcInfo"]["session_api"]["urls"][0]["url"]}?_format=json', data=json.dumps(reqd).encode("utf-8"), method="POST") | |
req.add_header("Content-Type", "application/json") | |
with request.urlopen(req) as res: | |
jr = json.loads(res.read().decode(res.headers.get_content_charset() or "utf-8")) | |
encoded = parse.quote(jr["data"]["session"]["content_uri"]) | |
flow.response.text = sub(PAT_GETFLV_SUB, encoded, flow.response.text) | |
print(f" -> {titles[video_id]} served from dmc") | |
threading.Thread(target=heartbeat, args=[j, jr]).start() | |
return True | |
def __history(flow): | |
def __element_to_history(_, element): | |
q = pq(element) | |
video_id = q("h5>a").attr("href").replace("watch/", "") | |
deleted = "deleted" in q("img.video").attr("data-original") | |
thumbnail = q("img.video").attr("data-original") if not deleted else "http://nicovideo.cdn.nimg.jp/web/img/common/video_deleted_ja-jp.jpg" | |
title = q("h5>a").text() | |
length = q(".videoTime").text() if not deleted else "0:00" | |
watch_date = int(mktime(strptime(q("p.posttime").html().replace(q("p.posttime>span").outer_html(), ""), "%Y年%m月%d日 %H:%M 視聴"))) | |
watch_count = int(search(r"\d+", q(".posttime>span").text()).group()) | |
return { | |
"item_id": video_id, | |
"video_id": video_id, | |
"deleted": 1 if deleted else 0, | |
"thumbnail_url": thumbnail, | |
"title": title, | |
"length": length, | |
"watch_date": watch_date, | |
"watch_count": watch_count, | |
"device": 0, | |
"playback_position": 0 | |
} | |
req = request.Request("https://www.nicovideo.jp/my/history") | |
req.add_header("Cookie", flow.request.headers["Cookie"]) | |
with request.urlopen(req) as res: | |
h = res.read().decode(res.headers.get_content_charset()) | |
q = pq(h, parser="html") | |
entries = q('div[id^="outer_"]').map(__element_to_history) | |
j = json.loads(flow.response.text) | |
j["history"] = entries | |
flow.response.text = json.dumps(j) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment