Last active
February 26, 2024 16:27
-
-
Save rlaphoenix/e18170c950a917890823eb53777a15a0 to your computer and use it in GitHub Desktop.
Various Python Flask API utilities for working with HLS/DASH/DRM streams.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import wraps | |
import os | |
import re | |
import subprocess | |
from pathlib import Path | |
from typing import Literal | |
from urllib.parse import quote_plus, unquote_plus, urlencode, urljoin | |
from uuid import uuid4 | |
import requests | |
import waitress | |
from flask import Flask, Response, request | |
app = Flask("iptv") | |
PUBLIC_URL = "https://iptv.domain.com/prefix" # no trailing slash, exclude the auth, can be localhost | |
AUTH = { | |
"users": [ | |
# short tokens recommended, url-safe base64 recommended | |
# https://gchq.github.io/CyberChef/#recipe=Pseudo-Random_Number_Generator(8,'Raw')To_Base64('A-Za-z0-9-_') | |
# e.g., | |
# "GUXpCqpTnt8", # me (example) | |
# "1AqTA4zLIlk", # friend | |
# ... | |
], | |
#"nordvpn": "B4NZAK6oA1FaAaJXNX3YC1Ap:AaAlBHMrBHZVC4BZCKNVA1At", # example service credentials for /proxy | |
"directv": { | |
"cookies": { # we only need these specific cookies | |
"JSESSIONID": "...", | |
"DYN_USER_ID": "...", | |
"DYN_USER_CONFIRM": "..." | |
}, | |
"zip": "90210" # best to use same zip code as account | |
}, | |
"rivertv": { | |
"username": "...", | |
"password": "..." | |
}, | |
"itv": { | |
"hmac": "..." | |
} | |
} | |
SERVICE_CACHE = { | |
"directv": { | |
"sessions": {}, | |
"playlists": {} | |
}, | |
"rivertv": { | |
"sessions": {}, | |
"playlists": {} | |
} | |
} | |
DRM_CONTENT_KEYS = { | |
"rte": { | |
"channel1": [ | |
#("key_id_hex", "key_hex"), | |
#... multiple keys can be specified | |
],#... and so on | |
}, | |
#... | |
} | |
DRM_TEMP_FOLDER = Path(r"C:\Users\John\AppData\Local\Temp\drm-decrypt") | |
DRM_INIT_CACHE = {} # mem cache until flask restart | |
EXPIRES_MAP = { | |
"application/octet-stream": "600", | |
"text/vtt": "600", | |
"video/mp2t": "600", | |
"application/x-mpegurl": "0", | |
"application/pgp-keys": "0" | |
} | |
CACHE_CONTROL_MAP = { | |
"application/octet-stream": "public, max-age=3600", | |
"text/vtt": "public, max-age=3600", | |
"video/mp2t": "public, max-age=3600", | |
"application/x-mpegurl": "no-cache", | |
"application/pgp-keys": "no-cache" | |
} | |
def authenticated(func): | |
@wraps(func) | |
def wrapper(**kwargs): | |
auth_token = kwargs.get("auth") | |
if not auth_token or auth_token not in AUTH["users"]: | |
return "Auth is invalid", 403 | |
if "auth" not in func.__code__.co_varnames: | |
del kwargs["auth"] | |
return func(**kwargs) | |
return wrapper | |
@app.route("/<string:auth>/<string:playlist>.m3u8", methods=["GET"]) | |
@authenticated | |
def channels(auth: str, playlist: str) -> tuple[str, int]: | |
""" | |
Return an M3U8 playlist of channels. | |
Available Playlists: | |
- master (all channels, excluding backup) | |
- hls (only HLS channels) | |
- dash (only DASH channels) | |
- backup (any backup stream, may be a mix of hls/dash) | |
""" | |
m3u8 = Path(__file__, f"../{playlist}.m3u8").read_text(encoding="utf8").\ | |
format(auth=auth) | |
r = Response(m3u8) | |
r.headers["Content-Type"] = "application/mpegURL" | |
return r, 200 | |
@app.route("/<string:auth>/youtube/<string:youtube_id>.m3u8", methods=["GET"]) | |
@authenticated | |
def youtube(youtube_id: str) -> tuple[str, int]: | |
""" | |
Convert a YouTube Video Stream ID to a HLS M3U stream. | |
E.g., /youtube/9Auq9mYxFEE.m3u8 (Sky News) | |
Note that the returned stream will be restricted to the Server's Country. | |
The initial manifest is restricted to the Server's IP, but the playlists and | |
files within are restricted only to the Caller's Country. | |
""" | |
try: | |
playlist = subprocess.check_output([ | |
"youtube-dl", | |
"-f", "96", | |
"-g", | |
f"https://www.youtube.com/watch?v={youtube_id}" | |
], stderr=subprocess.STDOUT).decode().strip() | |
except subprocess.CalledProcessError as e: | |
return e.output, 400 | |
m3u8 = requests.get(playlist).text | |
return m3u8, 200 | |
@app.route("/<string:auth>/filmon/<string:channel_id>/<string:quality>.m3u8", methods=["GET"]) | |
@authenticated | |
def filmon(channel_id: str, quality: Literal["low", "high"]) -> tuple[str, int]: | |
""" | |
Convert a FilmOn Channel ID to a HLS M3U stream. | |
E.g., /filmon/65/high.m3u8 (E4 480p) | |
The FilmOn links are intended to play as 30s previews but the URL usually lasts up to | |
2 minutes. However, we can bypass this by simply grabbing a new stream URL every time | |
the player calls your proxy URL. | |
""" | |
if not channel_id.isdigit(): | |
return f"Channel ID must be a number, not '{channel_id}'", 404 | |
quality = quality.lower() | |
if quality not in ("high", "low"): | |
return f"Quality '{quality}' is invalid, must be 'high' or 'low'", 404 | |
channel_res = requests.get( | |
url=f"http://www.filmon.com/api-v2/channel/{channel_id}", | |
params={"protocol": "hls"}, | |
headers={ | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:101.0) Gecko/20100101 Firefox/101.0" | |
} | |
) | |
if channel_res.status_code != 200: | |
return channel_res.json()["message"], channel_res.status_code | |
channel_data = channel_res.json()["data"] | |
stream_url = next((x["url"] for x in channel_data["streams"] if x["quality"] == quality), None) | |
if not stream_url: | |
return f"No stream with the quality '{quality}' available for Channel {channel_id}", 404 | |
stream = requests.get(stream_url) | |
if stream.status_code != 200: | |
return f"An error occurred loading FilmOn {quality} Stream [{stream.status_code}]", stream.status_code | |
m3u = "\n".join([ | |
# relative to absolute | |
urljoin(stream_url, line) if not line.startswith(("http", "#")) else line | |
for line in stream.text.splitlines(keepends=False) | |
]) | |
return m3u, 200 | |
@app.route("/<string:auth>/rte/<string:channel>.mpd", methods=["GET"]) | |
@authenticated | |
def rte(auth: str, channel: str) -> tuple[str, int]: | |
""" | |
Convert an RTE Channel ID to an MPEG-DASH MPD stream. | |
E.g., /rte/channel1.mpd (RTE One HD) | |
This defeats the Widevine DRM applied on the streams by simply intercepting | |
the stream calls with MiTM URL injection and decrypting them before returning | |
the data to the client. It's hacky, somewhat works, but it works. | |
""" | |
base_url = f"https://live.rte.ie/live/a/{channel}/{channel}.isml" | |
mpd_url = f"{base_url}/.mpd" | |
res = requests.get( | |
url=mpd_url, # .m3u8 exists but is FPS only | |
params={ | |
# these dont seem to actually matter lol | |
"dvr_window_length": "30", | |
"available": "1653483300", | |
"expiry": "1653513414", | |
"ip": "x.x.x.x", | |
"filter": "systemBitrate<=7000000", | |
"token1": "95f27451ce0d666a3428829e0c5818494cdfa1efd2f448a3a5e0b9ce40524ec3" | |
} | |
) | |
if res.status_code != 200: | |
return f"MPD Request Failed! {res.text}", 500 | |
# Disable Base Url and inject decrypt service | |
mpd = res.text\ | |
.replace("<BaseURL>dash/</BaseURL>", f"")\ | |
.replace('initialization="', f'initialization="{PUBLIC_URL}/{auth}/decrypt/rte/{channel}/init/')\ | |
.replace('media="', f'media="{PUBLIC_URL}/{auth}/decrypt/rte/{channel}/data/') | |
# Remove all Content Protections | |
mpd = re.sub(r'<ContentProtection[\s\S]*?<\/ContentProtection>', "", mpd) | |
# Remove all Representations except best | |
for representation in re.finditer(r'<Representation\s*id="(.*)"[\s\S]*?<\/Representation>', mpd): | |
rep_id = representation.group(1) | |
if rep_id not in ("audio_128k=128000", "video=6000000"): | |
mpd = mpd.replace(representation.group(0), "") | |
r = Response(mpd) | |
r.headers["Content-Type"] = "application/dash+xml" | |
return r, 200 | |
@app.route("/<string:auth>/channel4/<string:channel>.mpd", methods=["GET"]) | |
@authenticated | |
def channel4(auth: str, channel: str) -> tuple[str, int]: | |
""" | |
Convert a Channel 4 Channel ID to an MPEG-DASH MPD stream. | |
E.g., /channel4/c4.mpd (Channel 4 576p) | |
This defeats the Widevine DRM applied on the streams by simply intercepting | |
the stream calls with MiTM URL injection and decrypting them before returning | |
the data to the client. It's hacky, somewhat works, but it works. | |
""" | |
res = requests.get( | |
url=f"https://csm-e-c4ukdash-eb.tls1.yospace.com/csm/extlive/channelfour01,{channel}-v2-iso-dash-h12.mpd", # .m3u8 exists but is FPS only | |
params={ | |
"yo.ac": "false", | |
"yo.br": "false", | |
"siteSectionId": f"watchlive.channel4.com/{channel}", | |
"GUID": "f6d8b7b0-3ef5-4764-88dd-af5d7a136611" | |
} | |
) | |
if res.status_code != 200: | |
return f"MPD Request Failed! {res.text}", 500 | |
# Disable Base Url and inject decrypt service | |
mpd = res.text\ | |
.replace(f"<BaseURL>https://cdn.live.dash.c4assets.com/v2/iso-dash-mp/{channel}/</BaseURL>", f"")\ | |
.replace('initialization="', f'initialization="{PUBLIC_URL}/{auth}/decrypt/channel4/{channel}/init/')\ | |
.replace('media="', f'media="{PUBLIC_URL}/{auth}/decrypt/channel4/{channel}/data/') | |
# Remove Location URLs | |
mpd = re.sub(r'<Location[\s\S]*?<\/Location>', "", mpd) | |
# Remove Analytic URLs (dont need if yo.ac=false) | |
# mpd = re.sub(r'analytics=".*?"', "", mpd) | |
# Remove all Content Protections | |
mpd = re.sub(r'<ContentProtection[\s\S]*?<\/ContentProtection>', "", mpd) | |
# Remove all Representations except best | |
wanted_reps = ("item-07item", { | |
"c4": "item-12item", | |
"e4": "item-09item", | |
"m4": "item-12item", | |
"f4": "item-09item", | |
"4s": "item-09item" | |
}[channel]) | |
for representation in re.finditer(r'<Representation.*?id="(.*?)"/>', mpd): | |
if not representation.group(1).endswith(wanted_reps): | |
mpd = mpd.replace(representation.group(0), "") | |
r = Response(mpd) | |
r.headers["Content-Type"] = "application/dash+xml" | |
return r, 200 | |
@app.route("/<string:auth>/itv/<string:channel>.mpd", methods=["GET"]) | |
@authenticated | |
def itv(auth: str, channel: str) -> tuple[str, int]: | |
""" | |
Convert an ITV Channel ID to an MPEG-DASH MPD stream. | |
E.g., /itv/itvbe.mpd (ITVBe 504p) | |
Channel IDs: | |
- 'itv1' (ITV 720p) | |
- 'itv2' (ITV2 504p) | |
- 'itv3' (ITV3 504p) | |
- 'itv4' (ITV4 504p) | |
- 'itvbe' (ITVBe 504p) | |
- 'citv' (CITV 504p) | |
This defeats the Widevine DRM applied on the streams by simply intercepting | |
the stream calls with MiTM URL injection and decrypting them before returning | |
the data to the client. It's hacky, somewhat works, but it works. | |
""" | |
if not AUTH.get("nordvpn"): | |
return "Sorry! NordVPN Proxy is disabled, cannot use RiverTV.", 400 | |
valid_channels = ("itv1", "itv2", "itv3", "itv4", "itvbe", "citv") | |
if channel not in valid_channels: | |
return f"ITV Channel ID '{channel}' is invalid, expecting one of: {valid_channels}", 400 | |
nord_server = "uk1866" | |
session = requests.Session() | |
session.headers.update({ | |
"Accept": "application/vnd.itv.online.playlist.sim.v3+json", | |
"User-Agent": "ITV_Player_(Android)", | |
"hmac": AUTH["itv"]["hmac"] | |
}) | |
session.proxies.update({ | |
"all": f"https://{AUTH['nordvpn']}@{nord_server}.nordvpn.com:89" # 80 == HTTP, 89 == HTTP+SSL | |
}) | |
res = session.post( | |
url=f"https://simulcast.itv.com/playlist/itvonline/{channel.upper().replace('BE', 'Be').replace('1', '')}", | |
json={ | |
"client": { | |
"id": "android", # browser | |
"supportsAdPods": False, | |
"version": "10.3.0" # browser: 4.1 | |
}, | |
"device": { | |
"advertisingIdentifier": "d0d23c7e-d294-4b29-ad72-7f67c263a827", | |
"display": { | |
"size": "small" | |
}, | |
"firmware": "30", | |
"id": "redfin", | |
"manufacturer": "Google", | |
"model": "Pixel 5", | |
"os": { | |
"name": "android", | |
"type": "android", | |
"version": "11" | |
} | |
}, | |
"preview": False, | |
"user": { | |
"entitlements": [], | |
"status": "anon", | |
"subscribed": "false", | |
"token": "" | |
}, | |
"variantAvailability": { | |
"featureset": { | |
# hls,mpeg-dash,aes,widevine,fairplay,inband-webvtt,fairplay-download | |
# hls seems to have been removed completely within 2021 :( | |
"max": [ | |
"mpeg-dash", | |
"widevine", | |
"inband-webvtt" | |
], | |
"min": [ | |
"mpeg-dash", | |
"widevine" | |
] | |
}, | |
"platformTag": "mobile" # mobile, dotcom, ctv, | |
} | |
} | |
) | |
if res.status_code != 200: | |
return f"MPD Request Failed! {res.text}", res.status_code | |
# return res.text, 200 | |
mpd = session.get(res.json()["Playlist"]["Video"]["VideoLocations"][0]["Url"]).text | |
# Disable Base Url and inject decrypt service | |
mpd = mpd\ | |
.replace(f"<BaseURL>dash/</BaseURL>", f"")\ | |
.replace('initialization="', f'initialization="{PUBLIC_URL}/{auth}/decrypt/itv/{channel}/init/')\ | |
.replace('media="', f'media="{PUBLIC_URL}/{auth}/decrypt/itv/{channel}/data/') | |
# Remove Analytics | |
mpd = re.sub(r'analytics=".*?"', "", mpd) | |
mpd = re.sub(r'<EventStream[\s\S]*?<\/EventStream>', "", mpd) | |
# Remove all Content Protections | |
mpd = re.sub(r'<ContentProtection[\s\S]*?<\/ContentProtection>', "", mpd) | |
# Remove all Representations except best | |
for representation in re.finditer(r'<Representation\s*id="(.*)"[\s\S]*?<\/Representation>', mpd): | |
if representation.group(1) not in ("audio_1=96000", "video=1704000", "video=3403968"): | |
# last representation is 720p only available on ITV 1 | |
mpd = mpd.replace(representation.group(0), "") | |
r = Response(mpd) | |
r.headers["Content-Type"] = "application/dash+xml" | |
return r, 200 | |
@app.route("/<string:auth>/directv/<string:channel>.m3u8", methods=["GET"]) | |
@authenticated | |
def directv(channel: str) -> tuple[str, int]: | |
""" | |
Convert a DIRECTV Channel ID to an HLS M3U8 stream. | |
E.g., /directv/4211.m3u8 (Freeform HD) | |
This only works on DRM-free channels. Almost every channel is DRM protected except: | |
- 4211 (#311, FRFMHD, Freeform HD) | |
- 6055 (#289, DSJRHD, Disney Junior HD) | |
- 8397 (#292, DXDHD, Disney XD HD) | |
- 8398 (#390, DISEHD, Disney Channel HD) | |
""" | |
session = requests.Session() | |
session.cookies.update(AUTH["directv"]["cookies"]) | |
session.headers.update({ | |
"Referer": "https://www.directv.com/guide?lpos=Header:1", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:101.0) Gecko/20100101 Firefox/101.0", | |
"DNT": "1", | |
"Accept-Language": "en-US;q=0.9,en;q=0.8" | |
}) | |
# auth expires fairly quick, cache per-channel and per-address | |
authorization = SERVICE_CACHE["directv"]["sessions"].get(channel + request.remote_addr) | |
if not authorization: | |
res = session.get( | |
url="https://www.directv.com/json/authorization", | |
headers={ | |
"Accept": "application/json, text/plain, */*" | |
} | |
) | |
if res.status_code != 200: | |
return f"Failed to Authorize with DIRECTV! {res.text}", 500 | |
authorization = SERVICE_CACHE["directv"]["sessions"][channel + request.remote_addr] = res.json()["authorization"] | |
m3u8 = SERVICE_CACHE["directv"]["playlists"].get(channel) | |
if not m3u8: | |
stream = session.get( | |
url="https://pgauth-ca.dtvce.com/pgauth/content/authorize", | |
params={ | |
"et": authorization["etoken"], | |
"sig": authorization["signature"], | |
"sid": authorization["siteId"], | |
"suid": authorization["siteUserId"], | |
"actiontype": "1", | |
"clientid": "web", | |
"channel.id": channel, | |
"geo.zipcode": AUTH["directv"]["zip"], | |
"output": "json" | |
} | |
) | |
stream, status = stream.json(), stream.status_code | |
if status != 200 or stream.get("error_message"): | |
return f"{stream['error_message']} [{stream['error_code']}]", status | |
playlist = stream.get("playBackUrl") | |
m3u8 = session.get(playlist).text if playlist else "VideoGuard" | |
SERVICE_CACHE["directv"]["playlists"][channel] = m3u8 | |
if m3u8 == "VideoGuard": | |
return f"Stream is protected by VideoGuard", 403 | |
r = Response(m3u8) | |
r.headers["Content-Type"] = "application/mpegURL" | |
return r, 200 | |
@app.route("/<string:auth>/rivertv/<string:channel>.m3u8", methods=["GET"]) | |
@authenticated | |
def rivertv(auth: str, channel: str) -> tuple[str, int]: | |
""" | |
Convert a RiverTV Channel ID to an HLS M3U8 stream. | |
E.g., /rivertv/302728.m3u8 (TeleToon 720p) | |
This bypasses the Geoblock by using the /proxy endpoint on the URIs within the | |
playlist. A stable proxy will be required. | |
""" | |
if not AUTH.get("nordvpn"): | |
return "Sorry! NordVPN Proxy is disabled, cannot use RiverTV.", 400 | |
vbegin = request.args.get("vbegin") or "" | |
vend = request.args.get("vend") or "" | |
nord_server = "ca1081" | |
session = requests.Session() | |
session.headers.update({ | |
"Content-Type": "application/json", | |
"User-Agent": "RiverTV/1.8.0 (ca.rivertv.tv; build:1111; iOS 14.8.1) Alamofire/1.8.0", | |
"X-Custom-AppName": "R-ROKU" | |
}) | |
session.proxies.update({ | |
"all": f"https://{AUTH['nordvpn']}@{nord_server}.nordvpn.com:89" # 80 == HTTP, 89 == HTTP+SSL | |
}) | |
session_id = SERVICE_CACHE["rivertv"]["sessions"].get(f"{nord_server}-{channel}-{request.remote_addr}") | |
if not session_id: | |
login = session.post( | |
url="https://a3.cdn.vmedia.ca/Handlers/ClientService.ashx/json/Login", | |
json={ | |
"cc": { | |
"appSettings": { | |
"appName": "R-ROKU", | |
"settings": [{ | |
"key": "box.sys_version", | |
"value": "12" | |
}, { | |
"key": "box.app_version", | |
"value": "1.8.0" | |
}, { | |
"key": "box.build_number", | |
"value": "1111" | |
}, { | |
"key": "local.ip", | |
"value": "192.168.1.172" # TODO: Randomly generate 192.168.x.x subnet | |
}], | |
"siteID": 0 | |
}, | |
"clientCredentials": { | |
"StbMacAddress": os.urandom(8).hex(), # TODO: Generate based on remote+local IP Address | |
"UserLogin": AUTH["rivertv"]["username"], | |
"UserPassword": AUTH["rivertv"]["password"], | |
"settings": [] | |
} | |
}, | |
"deviceInformation": { | |
"DeviceId": str(uuid4()), | |
"model": "AFTMM_Amazon" | |
}, | |
"appName": "R-ROKU", | |
"sLang": "eng", | |
"siteID": 170 | |
} | |
).json() | |
error_code = next((x["value"] for x in login["appSettings"]["settings"] if x["key"] == "errorCode"), None) | |
error_msg = next((x["value"] for x in login["appSettings"]["settings"] if x["key"] == "errorMessage"), None) | |
if error_code != "OK": | |
return f"Cannot get RiverTV stream: {error_msg} [{error_code}]", 500 | |
session_id = SERVICE_CACHE["rivertv"]["sessions"][f"{nord_server}-{channel}-{request.remote_addr}"] = login["clientCredentials"]["sessionID"] | |
m3u8 = SERVICE_CACHE["rivertv"]["playlists"].get(f"{nord_server}-{channel}-{request.remote_addr}-{vbegin}{vend}") | |
if not m3u8: | |
stream = session.post( | |
url="https://a3.cdn.vmedia.ca/api/media/restartStream", | |
json={ | |
"mediaRequest": { | |
"IsRestart": 0, # 1 if live, 0 otherwise | |
"clientSidePlaylist": False, | |
"item": { | |
"contentType": 4, # 4 if live, 1 otherwise | |
"id": channel | |
}, | |
"startOffset": 0, | |
"streamSettings": { | |
"balancingArea": { | |
"id": -1 | |
}, | |
"qualityPreset": 0, | |
"shiftTimeZoneName": "NA_EST" | |
} | |
}, | |
"sessionID": session_id | |
} | |
).json() | |
playlist = stream["URL"].split("?")[0] | |
if vbegin and vend: | |
playlist += f"?{urlencode({'vbegin': vbegin, 'vend': vend})}" | |
m3u8 = SERVICE_CACHE["rivertv"]["playlists"][f"{nord_server}-{channel}-{request.remote_addr}-{vbegin}{vend}"] = requests.get( | |
url=f"{PUBLIC_URL}/{auth}/proxy/{nord_server}", | |
params={"url": playlist} | |
).text | |
r = Response(m3u8) | |
r.headers["Content-Type"] = "application/mpegURL" | |
return r, 200 | |
@app.route("/<string:auth>/9now/<string:channel>.m3u8", methods=["GET"]) | |
@authenticated | |
def ninenow(auth: str, channel: str) -> tuple[str, int]: | |
""" | |
Convert a 9Now Channel ID to an HLS M3U8 stream. | |
E.g., /9now/go.m3u8 (9Go 720p) | |
This bypasses the Geoblock by using the /proxy endpoint on the URIs within the | |
playlist. A stable proxy will be required. | |
""" | |
nord_server = "au602" | |
session = requests.Session() | |
session.headers.update({ | |
"Origin": "https://www.9now.com.au", | |
"Referer": "https://www.9now.com.au/", | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36", | |
"Authority": "api.9now.com.au", | |
"Accept-Language": "en-AU", | |
"sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="102", "Google Chrome";v="102"', | |
"sec-ch-ua-mobile": "?0", | |
"sec-ch-ua-platform": '"Windows"', | |
"sec-fetch-dest": "empty", | |
"sec-fetch-mode": "cors", | |
"sec-fetch-site": "same-site" | |
}) | |
res = session.get( | |
url="https://api.9now.com.au/live-experience", | |
params={ | |
"device": "web", | |
"region": "nsw", | |
"slug": channel, | |
"streamParams": "web,chrome,windows" | |
} | |
) | |
if res.status_code != 200: | |
return res.json(), res.status_code | |
stream_url = res.json()["data"]["getLXP"]["stream"]["video"]["url"] | |
r = Response(requests.get( | |
url=f"{PUBLIC_URL}/{auth}/proxy/{nord_server}", | |
params={"url": stream_url} | |
).text) | |
r.headers["Content-Type"] = "application/mpegURL" | |
return r, 200 | |
@app.route("/<string:auth>/proxy/<string:server>", methods=["GET"]) | |
@authenticated | |
def proxy(auth: str, server: str) -> tuple[str, int]: | |
""" | |
Download the specified URL using a Proxy. | |
If the URL is to an m3u(8) document, it will rewrite the URLs within it to also be | |
proxied with the same server. | |
This may fail if the specified URL or any URL within the M3U8 is too long. | |
Possible Server values: | |
- No Proxy put 'none' (Uses the Server's IP/Network). | |
- NordVPN Proxy put E.g., 'uk2020'. See https://nordvpn.com/servers/tools/ | |
- Docoja.com/blue UK Web Proxy put 'docoja'. | |
""" | |
url = unquote_plus(request.args.get("url") or "") | |
if not url: | |
return "The 'url' param must be supplied.", 400 | |
session = requests.Session() | |
session.headers.update({"User-Agent": request.headers.get("User-Agent")}) | |
if server == "docoja": | |
session.headers.update({ | |
"Referer": "https://www.docoja.com/blue/" | |
}) | |
res = session.get( | |
url="https://www.docoja.com/blue/browse.php", | |
params={ | |
"u": url, | |
"b": 4, | |
"f": "norefer" | |
} | |
) | |
else: | |
if server != "none": | |
if not AUTH.get("nordvpn"): | |
return "Sorry! NordVPN Proxy is disabled, please use another server.", 400 | |
proxy_hostname = f"{server}.nordvpn.com" | |
if subprocess.call(["ping", ["-c", "-n"][os.name == "nt"], "1", proxy_hostname]) != 0: | |
return f"Invalid NordVPN Server: '{proxy_hostname}'", 400 | |
session.proxies.update({"all": f"https://{AUTH['nordvpn']}@{proxy_hostname}:89"}) # 80 == HTTP, 89 == HTTP+SSL | |
res = session.get(url) | |
if res.content.startswith(b"#EXTM3U"): | |
m3u8 = res.text.splitlines(keepends=False) | |
# base inject url | |
inject_base = f"{PUBLIC_URL}/{auth}/proxy/{server}" | |
# relative to absolute | |
m3u8 = [ | |
urljoin(url, line) if line and not line.startswith(("http", "#")) else line | |
for line in m3u8 | |
] | |
# use proxied ts url | |
m3u8 = [ | |
f"{inject_base}?url={quote_plus(line)}" if line and not line.startswith("#") else line | |
for line in m3u8 | |
] | |
# rejoin | |
m3u8 = "\n".join(m3u8) | |
# use proxied playlist urls | |
for uri in re.finditer(r',URI="(.+?)"', m3u8): | |
m3u8 = m3u8.replace(uri.group(0), f',URI="{inject_base}?url={quote_plus(urljoin(url, uri.group(1)))}"') | |
return Response( | |
m3u8, | |
res.status_code, | |
content_type="application/mpegURL" | |
) | |
else: | |
# assume data stream | |
return Response( | |
res.content, | |
res.status_code, | |
content_type=res.headers.get("content-type") or "application/octet-stream", | |
) | |
@app.route("/<string:auth>/decrypt/<string:service>/<string:channel>/<string:seg_type>/<string:path>", methods=["GET"]) | |
@authenticated | |
def decrypt(auth: str, service: str, channel: str, seg_type: str, path: str) -> tuple[str, int]: | |
"""Decrypt MPEG-DASH stream data, pretty hacky method like /proxy but it works, somewhat.""" | |
url = { | |
"rte": f"https://live.rte.ie/live/a/{channel}/{channel}.isml/dash/{path}", | |
"channel4": f"https://cdn.live.dash.c4assets.com/v2/iso-dash-mp/{channel}/{path}", | |
"itv": f"https://{channel}simamobile.cdn1.content.itv.com/playout/mb01/{channel}/cenc.isml/dash/{path}", # android | |
# browser: "itv": f"https://itv1simadotcom.cdn1.content.itv.com/playout/pc01/{channel}/cenc.isml/dash/{path}" | |
}[service] | |
proxy = { | |
"itv": "docoja" | |
}.get(service) | |
if proxy: | |
url = f"{PUBLIC_URL}/{auth}/proxy/{proxy}?url={quote_plus(url)}" | |
presentation_id = re.search({ | |
"rte": rf"{channel}-([a-z]*[^-.]*)-?(?:\d+)?", | |
"channel4": r"\d+(item-\d{2}item)", | |
"itv": r"cenc-([a-z]*[^-.]*)-?(?:\d+)?" | |
}[service], path).group(1) | |
service_key = f"{service}-{channel}" | |
key_prefix = f"{service_key}-{presentation_id}" | |
if seg_type == "init": | |
out = DRM_INIT_CACHE[key_prefix] = requests.get(url).content | |
else: | |
out = DRM_SEGMENT_CACHE[service_key].get(path) | |
if not out: | |
init = DRM_INIT_CACHE.get(key_prefix) | |
if not init: | |
# TODO: Problematic | |
msg = f"No init data for {path}" | |
print(msg) | |
return msg, 500 | |
# decrypt with shaka-packager, sadly cant efficiently pipe it unless your on linux | |
# even on linux, its just using a special fifo file, not really piping | |
segment_file_path = (DRM_TEMP_FOLDER / f"{key_prefix}-{path}.mp4") | |
segment_file_path.write_bytes(init + requests.get(url).content) | |
subprocess.check_call( | |
[ | |
"packager-win-x64", | |
f"input={segment_file_path},stream=0,output={segment_file_path}", | |
"--enable_raw_key_decryption", | |
"--keys", | |
",".join([ | |
f"label={i}:key_id={key[0]}:key={key[1]}" | |
for i, key in enumerate(DRM_CONTENT_KEYS[service][channel]) | |
]) | |
], | |
# may potentially give a speed boost | |
stdout=subprocess.DEVNULL, | |
stderr=subprocess.DEVNULL | |
) | |
out = DRM_SEGMENT_CACHE[service_key][path] = segment_file_path.read_bytes() | |
segment_file_path.unlink() | |
# remove the init data, commented out as ive noticed smoother streams by keeping | |
# moov_start = out.rfind(b"moov") - 4 | |
# moov_length = int.from_bytes(out[moov_start:moov_start + 4], "big") | |
# out = out[moov_start + moov_length:] | |
if len(DRM_SEGMENT_CACHE[service_key]) > 50: | |
DRM_SEGMENT_CACHE[service_key].popitem() | |
r = Response(out) | |
r.headers["Content-Type"] = "video/mp4" | |
return r, 200 | |
@app.after_request | |
def apply_caching(response): | |
response.headers.update({ | |
"access-control-allow-origin": "*", | |
"x-accel-expires": EXPIRES_MAP.get(response.headers.get("content-type").lower(), "0"), | |
"cache-control": CACHE_CONTROL_MAP.get(response.headers.get("content-type").lower(), "no-cache"), | |
}) | |
return response | |
DRM_TEMP_FOLDER.mkdir(parents=True, exist_ok=True) | |
waitress.serve(app, host="0.0.0.0", port="5954") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment