Skip to content

Instantly share code, notes, and snippets.

@gssequence

gssequence/mitm-inico.py Secret

Created Dec 7, 2018
Embed
What would you like to do?
mitmproxy script for iNico
import html
import json
from re import match, search, sub
import threading
from time import mktime, sleep, strptime
from urllib import parse, request
from pyquery import PyQuery as pq
PAT_WATCH_PATH = r"^/watch/(.*?)\?watch_harmful=1$"
PAT_GETFLV_PATH = r"^/api/getflv\?v=(.*)"
PAT_WATCH_DATA = r'<div id="js-initial-watch-data" data-api-data="(.*?)"'
PAT_GETFLV_SUB = r"(?<=&url=)(.*?)(?=&ms=)"
smile_urls = {}
titles = {}
watch_data = {}
def response(flow):
scheme = flow.request.scheme
host = flow.request.host_header
path = flow.request.path
if scheme == "https" and host == "www.nicovideo.jp":
m = match(PAT_WATCH_PATH, path)
if m:
__watch_page(flow, m.group(1))
return
if scheme == "http" and host == "flapi.nicovideo.jp":
m = match(PAT_GETFLV_PATH, path)
if m:
__getflv_dmc(flow, m.group(1)) or __getflv_smile(flow, m.group(1))
return
if scheme == "http" and host == "www.nicovideo.jp" and path == "/api/videoviewhistory/list":
__history(flow)
return
def __watch_page(flow, video_id):
m = html.unescape(search(PAT_WATCH_DATA, flow.response.text).group(1))
j = json.loads(m)
url = j["video"]["smileInfo"]["url"]
title = j["video"]["originalTitle"]
titles[video_id] = title
smile_urls[video_id] = url
if j["video"]["dmcInfo"] is not None:
j["video"]["dmcInfo"]["session_api"]["token_parsed"] = json.loads(j["video"]["dmcInfo"]["session_api"]["token"])
watch_data[video_id] = j
def __getflv_smile(flow, video_id):
if not video_id in smile_urls:
return False
encoded = parse.quote(smile_urls[video_id])
flow.response.text = sub(PAT_GETFLV_SUB, encoded, flow.response.text)
print(f" -> {titles[video_id]} served from smile")
return True
def __getflv_dmc(flow, video_id):
def heartbeat(j, jr):
print(f' -> Heartbeat loop for {j["video"]["originalTitle"]} has been started')
limit = 2 * 60
for i in range(limit):
sleep(60)
req = request.Request(f'{j["video"]["dmcInfo"]["session_api"]["urls"][0]["url"]}/{jr["data"]["session"]["id"]}?_format=json&_method=PUT', data=json.dumps(jr["data"]).encode("utf-8"), method="POST")
req.add_header("Content-Type", "application/json")
with request.urlopen(req):
print(f' -> Heartbeat {i + 1} of {limit} for {j["video"]["originalTitle"]}')
if not video_id in watch_data:
return False
j = watch_data[video_id]
reqd = {
"session": {
"client_info": {
"player_id": j["video"]["dmcInfo"]["session_api"]["player_id"]
},
"content_auth": {
"auth_type": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["protocols"][0]["auth_type"],
"content_key_timeout": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["content_key_timeout"],
"service_id": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["service_id"],
"service_user_id": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["service_user_id"]
},
"content_id": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["content_ids"][0],
"content_src_id_sets": [
{
"content_src_ids": [
{
"src_id_to_mux": {
"audio_src_ids": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["audios"],
"video_src_ids": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["videos"]
}
}
]
}
],
"content_type": "movie",
"content_uri": "",
"keep_method": {
"heartbeat": {
"lifetime": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["heartbeat_lifetime"]
}
},
"priority": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["priority"],
"protocol": {
"name": "http",
"parameters": {
"http_parameters": {
"parameters": {
"http_output_download_parameters": {
"transfer_preset": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["transfer_presets"][0],
"use_ssl": "yes",
"use_well_known_port": "no"
}
}
}
}
},
"recipe_id": j["video"]["dmcInfo"]["session_api"]["token_parsed"]["recipe_id"],
"session_operation_auth": {
"session_operation_auth_by_signature": {
"signature": j["video"]["dmcInfo"]["session_api"]["signature"],
"token": j["video"]["dmcInfo"]["session_api"]["token"]
}
},
"timing_constraint": "unlimited"
}
}
req = request.Request(f'{j["video"]["dmcInfo"]["session_api"]["urls"][0]["url"]}?_format=json', data=json.dumps(reqd).encode("utf-8"), method="POST")
req.add_header("Content-Type", "application/json")
with request.urlopen(req) as res:
jr = json.loads(res.read().decode(res.headers.get_content_charset() or "utf-8"))
encoded = parse.quote(jr["data"]["session"]["content_uri"])
flow.response.text = sub(PAT_GETFLV_SUB, encoded, flow.response.text)
print(f" -> {titles[video_id]} served from dmc")
threading.Thread(target=heartbeat, args=[j, jr]).start()
return True
def __history(flow):
def __element_to_history(_, element):
q = pq(element)
video_id = q("h5>a").attr("href").replace("watch/", "")
deleted = "deleted" in q("img.video").attr("data-original")
thumbnail = q("img.video").attr("data-original") if not deleted else "http://nicovideo.cdn.nimg.jp/web/img/common/video_deleted_ja-jp.jpg"
title = q("h5>a").text()
length = q(".videoTime").text() if not deleted else "0:00"
watch_date = int(mktime(strptime(q("p.posttime").html().replace(q("p.posttime>span").outer_html(), ""), "%Y年%m月%d日 %H:%M 視聴")))
watch_count = int(search(r"\d+", q(".posttime>span").text()).group())
return {
"item_id": video_id,
"video_id": video_id,
"deleted": 1 if deleted else 0,
"thumbnail_url": thumbnail,
"title": title,
"length": length,
"watch_date": watch_date,
"watch_count": watch_count,
"device": 0,
"playback_position": 0
}
req = request.Request("https://www.nicovideo.jp/my/history")
req.add_header("Cookie", flow.request.headers["Cookie"])
with request.urlopen(req) as res:
h = res.read().decode(res.headers.get_content_charset())
q = pq(h, parser="html")
entries = q('div[id^="outer_"]').map(__element_to_history)
j = json.loads(flow.response.text)
j["history"] = entries
flow.response.text = json.dumps(j)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.