Skip to content

Instantly share code, notes, and snippets.

@rlaphoenix
Last active February 26, 2024 16:27
Show Gist options
  • Save rlaphoenix/e18170c950a917890823eb53777a15a0 to your computer and use it in GitHub Desktop.
Save rlaphoenix/e18170c950a917890823eb53777a15a0 to your computer and use it in GitHub Desktop.
Various Python Flask API utilities for working with HLS/DASH/DRM streams.
from functools import wraps
import os
import re
import subprocess
from pathlib import Path
from typing import Literal
from urllib.parse import quote_plus, unquote_plus, urlencode, urljoin
from uuid import uuid4
import requests
import waitress
from flask import Flask, Response, request
app = Flask("iptv")
PUBLIC_URL = "https://iptv.domain.com/prefix" # no trailing slash, exclude the auth, can be localhost
AUTH = {
"users": [
# short tokens recommended, url-safe base64 recommended
# https://gchq.github.io/CyberChef/#recipe=Pseudo-Random_Number_Generator(8,'Raw')To_Base64('A-Za-z0-9-_')
# e.g.,
# "GUXpCqpTnt8", # me (example)
# "1AqTA4zLIlk", # friend
# ...
],
#"nordvpn": "B4NZAK6oA1FaAaJXNX3YC1Ap:AaAlBHMrBHZVC4BZCKNVA1At", # example service credentials for /proxy
"directv": {
"cookies": { # we only need these specific cookies
"JSESSIONID": "...",
"DYN_USER_ID": "...",
"DYN_USER_CONFIRM": "..."
},
"zip": "90210" # best to use same zip code as account
},
"rivertv": {
"username": "...",
"password": "..."
},
"itv": {
"hmac": "..."
}
}
SERVICE_CACHE = {
"directv": {
"sessions": {},
"playlists": {}
},
"rivertv": {
"sessions": {},
"playlists": {}
}
}
DRM_CONTENT_KEYS = {
"rte": {
"channel1": [
#("key_id_hex", "key_hex"),
#... multiple keys can be specified
],#... and so on
},
#...
}
DRM_TEMP_FOLDER = Path(r"C:\Users\John\AppData\Local\Temp\drm-decrypt")
DRM_INIT_CACHE = {} # mem cache until flask restart
EXPIRES_MAP = {
"application/octet-stream": "600",
"text/vtt": "600",
"video/mp2t": "600",
"application/x-mpegurl": "0",
"application/pgp-keys": "0"
}
CACHE_CONTROL_MAP = {
"application/octet-stream": "public, max-age=3600",
"text/vtt": "public, max-age=3600",
"video/mp2t": "public, max-age=3600",
"application/x-mpegurl": "no-cache",
"application/pgp-keys": "no-cache"
}
def authenticated(func):
@wraps(func)
def wrapper(**kwargs):
auth_token = kwargs.get("auth")
if not auth_token or auth_token not in AUTH["users"]:
return "Auth is invalid", 403
if "auth" not in func.__code__.co_varnames:
del kwargs["auth"]
return func(**kwargs)
return wrapper
@app.route("/<string:auth>/<string:playlist>.m3u8", methods=["GET"])
@authenticated
def channels(auth: str, playlist: str) -> tuple[str, int]:
"""
Return an M3U8 playlist of channels.
Available Playlists:
- master (all channels, excluding backup)
- hls (only HLS channels)
- dash (only DASH channels)
- backup (any backup stream, may be a mix of hls/dash)
"""
m3u8 = Path(__file__, f"../{playlist}.m3u8").read_text(encoding="utf8").\
format(auth=auth)
r = Response(m3u8)
r.headers["Content-Type"] = "application/mpegURL"
return r, 200
@app.route("/<string:auth>/youtube/<string:youtube_id>.m3u8", methods=["GET"])
@authenticated
def youtube(youtube_id: str) -> tuple[str, int]:
"""
Convert a YouTube Video Stream ID to a HLS M3U stream.
E.g., /youtube/9Auq9mYxFEE.m3u8 (Sky News)
Note that the returned stream will be restricted to the Server's Country.
The initial manifest is restricted to the Server's IP, but the playlists and
files within are restricted only to the Caller's Country.
"""
try:
playlist = subprocess.check_output([
"youtube-dl",
"-f", "96",
"-g",
f"https://www.youtube.com/watch?v={youtube_id}"
], stderr=subprocess.STDOUT).decode().strip()
except subprocess.CalledProcessError as e:
return e.output, 400
m3u8 = requests.get(playlist).text
return m3u8, 200
@app.route("/<string:auth>/filmon/<string:channel_id>/<string:quality>.m3u8", methods=["GET"])
@authenticated
def filmon(channel_id: str, quality: Literal["low", "high"]) -> tuple[str, int]:
"""
Convert a FilmOn Channel ID to a HLS M3U stream.
E.g., /filmon/65/high.m3u8 (E4 480p)
The FilmOn links are intended to play as 30s previews but the URL usually lasts up to
2 minutes. However, we can bypass this by simply grabbing a new stream URL every time
the player calls your proxy URL.
"""
if not channel_id.isdigit():
return f"Channel ID must be a number, not '{channel_id}'", 404
quality = quality.lower()
if quality not in ("high", "low"):
return f"Quality '{quality}' is invalid, must be 'high' or 'low'", 404
channel_res = requests.get(
url=f"http://www.filmon.com/api-v2/channel/{channel_id}",
params={"protocol": "hls"},
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:101.0) Gecko/20100101 Firefox/101.0"
}
)
if channel_res.status_code != 200:
return channel_res.json()["message"], channel_res.status_code
channel_data = channel_res.json()["data"]
stream_url = next((x["url"] for x in channel_data["streams"] if x["quality"] == quality), None)
if not stream_url:
return f"No stream with the quality '{quality}' available for Channel {channel_id}", 404
stream = requests.get(stream_url)
if stream.status_code != 200:
return f"An error occurred loading FilmOn {quality} Stream [{stream.status_code}]", stream.status_code
m3u = "\n".join([
# relative to absolute
urljoin(stream_url, line) if not line.startswith(("http", "#")) else line
for line in stream.text.splitlines(keepends=False)
])
return m3u, 200
@app.route("/<string:auth>/rte/<string:channel>.mpd", methods=["GET"])
@authenticated
def rte(auth: str, channel: str) -> tuple[str, int]:
"""
Convert an RTE Channel ID to an MPEG-DASH MPD stream.
E.g., /rte/channel1.mpd (RTE One HD)
This defeats the Widevine DRM applied on the streams by simply intercepting
the stream calls with MiTM URL injection and decrypting them before returning
the data to the client. It's hacky, somewhat works, but it works.
"""
base_url = f"https://live.rte.ie/live/a/{channel}/{channel}.isml"
mpd_url = f"{base_url}/.mpd"
res = requests.get(
url=mpd_url, # .m3u8 exists but is FPS only
params={
# these dont seem to actually matter lol
"dvr_window_length": "30",
"available": "1653483300",
"expiry": "1653513414",
"ip": "x.x.x.x",
"filter": "systemBitrate<=7000000",
"token1": "95f27451ce0d666a3428829e0c5818494cdfa1efd2f448a3a5e0b9ce40524ec3"
}
)
if res.status_code != 200:
return f"MPD Request Failed! {res.text}", 500
# Disable Base Url and inject decrypt service
mpd = res.text\
.replace("<BaseURL>dash/</BaseURL>", f"")\
.replace('initialization="', f'initialization="{PUBLIC_URL}/{auth}/decrypt/rte/{channel}/init/')\
.replace('media="', f'media="{PUBLIC_URL}/{auth}/decrypt/rte/{channel}/data/')
# Remove all Content Protections
mpd = re.sub(r'<ContentProtection[\s\S]*?<\/ContentProtection>', "", mpd)
# Remove all Representations except best
for representation in re.finditer(r'<Representation\s*id="(.*)"[\s\S]*?<\/Representation>', mpd):
rep_id = representation.group(1)
if rep_id not in ("audio_128k=128000", "video=6000000"):
mpd = mpd.replace(representation.group(0), "")
r = Response(mpd)
r.headers["Content-Type"] = "application/dash+xml"
return r, 200
@app.route("/<string:auth>/channel4/<string:channel>.mpd", methods=["GET"])
@authenticated
def channel4(auth: str, channel: str) -> tuple[str, int]:
"""
Convert a Channel 4 Channel ID to an MPEG-DASH MPD stream.
E.g., /channel4/c4.mpd (Channel 4 576p)
This defeats the Widevine DRM applied on the streams by simply intercepting
the stream calls with MiTM URL injection and decrypting them before returning
the data to the client. It's hacky, somewhat works, but it works.
"""
res = requests.get(
url=f"https://csm-e-c4ukdash-eb.tls1.yospace.com/csm/extlive/channelfour01,{channel}-v2-iso-dash-h12.mpd", # .m3u8 exists but is FPS only
params={
"yo.ac": "false",
"yo.br": "false",
"siteSectionId": f"watchlive.channel4.com/{channel}",
"GUID": "f6d8b7b0-3ef5-4764-88dd-af5d7a136611"
}
)
if res.status_code != 200:
return f"MPD Request Failed! {res.text}", 500
# Disable Base Url and inject decrypt service
mpd = res.text\
.replace(f"<BaseURL>https://cdn.live.dash.c4assets.com/v2/iso-dash-mp/{channel}/</BaseURL>", f"")\
.replace('initialization="', f'initialization="{PUBLIC_URL}/{auth}/decrypt/channel4/{channel}/init/')\
.replace('media="', f'media="{PUBLIC_URL}/{auth}/decrypt/channel4/{channel}/data/')
# Remove Location URLs
mpd = re.sub(r'<Location[\s\S]*?<\/Location>', "", mpd)
# Remove Analytic URLs (dont need if yo.ac=false)
# mpd = re.sub(r'analytics=".*?"', "", mpd)
# Remove all Content Protections
mpd = re.sub(r'<ContentProtection[\s\S]*?<\/ContentProtection>', "", mpd)
# Remove all Representations except best
wanted_reps = ("item-07item", {
"c4": "item-12item",
"e4": "item-09item",
"m4": "item-12item",
"f4": "item-09item",
"4s": "item-09item"
}[channel])
for representation in re.finditer(r'<Representation.*?id="(.*?)"/>', mpd):
if not representation.group(1).endswith(wanted_reps):
mpd = mpd.replace(representation.group(0), "")
r = Response(mpd)
r.headers["Content-Type"] = "application/dash+xml"
return r, 200
@app.route("/<string:auth>/itv/<string:channel>.mpd", methods=["GET"])
@authenticated
def itv(auth: str, channel: str) -> tuple[str, int]:
"""
Convert an ITV Channel ID to an MPEG-DASH MPD stream.
E.g., /itv/itvbe.mpd (ITVBe 504p)
Channel IDs:
- 'itv1' (ITV 720p)
- 'itv2' (ITV2 504p)
- 'itv3' (ITV3 504p)
- 'itv4' (ITV4 504p)
- 'itvbe' (ITVBe 504p)
- 'citv' (CITV 504p)
This defeats the Widevine DRM applied on the streams by simply intercepting
the stream calls with MiTM URL injection and decrypting them before returning
the data to the client. It's hacky, somewhat works, but it works.
"""
if not AUTH.get("nordvpn"):
return "Sorry! NordVPN Proxy is disabled, cannot use RiverTV.", 400
valid_channels = ("itv1", "itv2", "itv3", "itv4", "itvbe", "citv")
if channel not in valid_channels:
return f"ITV Channel ID '{channel}' is invalid, expecting one of: {valid_channels}", 400
nord_server = "uk1866"
session = requests.Session()
session.headers.update({
"Accept": "application/vnd.itv.online.playlist.sim.v3+json",
"User-Agent": "ITV_Player_(Android)",
"hmac": AUTH["itv"]["hmac"]
})
session.proxies.update({
"all": f"https://{AUTH['nordvpn']}@{nord_server}.nordvpn.com:89" # 80 == HTTP, 89 == HTTP+SSL
})
res = session.post(
url=f"https://simulcast.itv.com/playlist/itvonline/{channel.upper().replace('BE', 'Be').replace('1', '')}",
json={
"client": {
"id": "android", # browser
"supportsAdPods": False,
"version": "10.3.0" # browser: 4.1
},
"device": {
"advertisingIdentifier": "d0d23c7e-d294-4b29-ad72-7f67c263a827",
"display": {
"size": "small"
},
"firmware": "30",
"id": "redfin",
"manufacturer": "Google",
"model": "Pixel 5",
"os": {
"name": "android",
"type": "android",
"version": "11"
}
},
"preview": False,
"user": {
"entitlements": [],
"status": "anon",
"subscribed": "false",
"token": ""
},
"variantAvailability": {
"featureset": {
# hls,mpeg-dash,aes,widevine,fairplay,inband-webvtt,fairplay-download
# hls seems to have been removed completely within 2021 :(
"max": [
"mpeg-dash",
"widevine",
"inband-webvtt"
],
"min": [
"mpeg-dash",
"widevine"
]
},
"platformTag": "mobile" # mobile, dotcom, ctv,
}
}
)
if res.status_code != 200:
return f"MPD Request Failed! {res.text}", res.status_code
# return res.text, 200
mpd = session.get(res.json()["Playlist"]["Video"]["VideoLocations"][0]["Url"]).text
# Disable Base Url and inject decrypt service
mpd = mpd\
.replace(f"<BaseURL>dash/</BaseURL>", f"")\
.replace('initialization="', f'initialization="{PUBLIC_URL}/{auth}/decrypt/itv/{channel}/init/')\
.replace('media="', f'media="{PUBLIC_URL}/{auth}/decrypt/itv/{channel}/data/')
# Remove Analytics
mpd = re.sub(r'analytics=".*?"', "", mpd)
mpd = re.sub(r'<EventStream[\s\S]*?<\/EventStream>', "", mpd)
# Remove all Content Protections
mpd = re.sub(r'<ContentProtection[\s\S]*?<\/ContentProtection>', "", mpd)
# Remove all Representations except best
for representation in re.finditer(r'<Representation\s*id="(.*)"[\s\S]*?<\/Representation>', mpd):
if representation.group(1) not in ("audio_1=96000", "video=1704000", "video=3403968"):
# last representation is 720p only available on ITV 1
mpd = mpd.replace(representation.group(0), "")
r = Response(mpd)
r.headers["Content-Type"] = "application/dash+xml"
return r, 200
@app.route("/<string:auth>/directv/<string:channel>.m3u8", methods=["GET"])
@authenticated
def directv(channel: str) -> tuple[str, int]:
"""
Convert a DIRECTV Channel ID to an HLS M3U8 stream.
E.g., /directv/4211.m3u8 (Freeform HD)
This only works on DRM-free channels. Almost every channel is DRM protected except:
- 4211 (#311, FRFMHD, Freeform HD)
- 6055 (#289, DSJRHD, Disney Junior HD)
- 8397 (#292, DXDHD, Disney XD HD)
- 8398 (#390, DISEHD, Disney Channel HD)
"""
session = requests.Session()
session.cookies.update(AUTH["directv"]["cookies"])
session.headers.update({
"Referer": "https://www.directv.com/guide?lpos=Header:1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:101.0) Gecko/20100101 Firefox/101.0",
"DNT": "1",
"Accept-Language": "en-US;q=0.9,en;q=0.8"
})
# auth expires fairly quick, cache per-channel and per-address
authorization = SERVICE_CACHE["directv"]["sessions"].get(channel + request.remote_addr)
if not authorization:
res = session.get(
url="https://www.directv.com/json/authorization",
headers={
"Accept": "application/json, text/plain, */*"
}
)
if res.status_code != 200:
return f"Failed to Authorize with DIRECTV! {res.text}", 500
authorization = SERVICE_CACHE["directv"]["sessions"][channel + request.remote_addr] = res.json()["authorization"]
m3u8 = SERVICE_CACHE["directv"]["playlists"].get(channel)
if not m3u8:
stream = session.get(
url="https://pgauth-ca.dtvce.com/pgauth/content/authorize",
params={
"et": authorization["etoken"],
"sig": authorization["signature"],
"sid": authorization["siteId"],
"suid": authorization["siteUserId"],
"actiontype": "1",
"clientid": "web",
"channel.id": channel,
"geo.zipcode": AUTH["directv"]["zip"],
"output": "json"
}
)
stream, status = stream.json(), stream.status_code
if status != 200 or stream.get("error_message"):
return f"{stream['error_message']} [{stream['error_code']}]", status
playlist = stream.get("playBackUrl")
m3u8 = session.get(playlist).text if playlist else "VideoGuard"
SERVICE_CACHE["directv"]["playlists"][channel] = m3u8
if m3u8 == "VideoGuard":
return f"Stream is protected by VideoGuard", 403
r = Response(m3u8)
r.headers["Content-Type"] = "application/mpegURL"
return r, 200
@app.route("/<string:auth>/rivertv/<string:channel>.m3u8", methods=["GET"])
@authenticated
def rivertv(auth: str, channel: str) -> tuple[str, int]:
"""
Convert a RiverTV Channel ID to an HLS M3U8 stream.
E.g., /rivertv/302728.m3u8 (TeleToon 720p)
This bypasses the Geoblock by using the /proxy endpoint on the URIs within the
playlist. A stable proxy will be required.
"""
if not AUTH.get("nordvpn"):
return "Sorry! NordVPN Proxy is disabled, cannot use RiverTV.", 400
vbegin = request.args.get("vbegin") or ""
vend = request.args.get("vend") or ""
nord_server = "ca1081"
session = requests.Session()
session.headers.update({
"Content-Type": "application/json",
"User-Agent": "RiverTV/1.8.0 (ca.rivertv.tv; build:1111; iOS 14.8.1) Alamofire/1.8.0",
"X-Custom-AppName": "R-ROKU"
})
session.proxies.update({
"all": f"https://{AUTH['nordvpn']}@{nord_server}.nordvpn.com:89" # 80 == HTTP, 89 == HTTP+SSL
})
session_id = SERVICE_CACHE["rivertv"]["sessions"].get(f"{nord_server}-{channel}-{request.remote_addr}")
if not session_id:
login = session.post(
url="https://a3.cdn.vmedia.ca/Handlers/ClientService.ashx/json/Login",
json={
"cc": {
"appSettings": {
"appName": "R-ROKU",
"settings": [{
"key": "box.sys_version",
"value": "12"
}, {
"key": "box.app_version",
"value": "1.8.0"
}, {
"key": "box.build_number",
"value": "1111"
}, {
"key": "local.ip",
"value": "192.168.1.172" # TODO: Randomly generate 192.168.x.x subnet
}],
"siteID": 0
},
"clientCredentials": {
"StbMacAddress": os.urandom(8).hex(), # TODO: Generate based on remote+local IP Address
"UserLogin": AUTH["rivertv"]["username"],
"UserPassword": AUTH["rivertv"]["password"],
"settings": []
}
},
"deviceInformation": {
"DeviceId": str(uuid4()),
"model": "AFTMM_Amazon"
},
"appName": "R-ROKU",
"sLang": "eng",
"siteID": 170
}
).json()
error_code = next((x["value"] for x in login["appSettings"]["settings"] if x["key"] == "errorCode"), None)
error_msg = next((x["value"] for x in login["appSettings"]["settings"] if x["key"] == "errorMessage"), None)
if error_code != "OK":
return f"Cannot get RiverTV stream: {error_msg} [{error_code}]", 500
session_id = SERVICE_CACHE["rivertv"]["sessions"][f"{nord_server}-{channel}-{request.remote_addr}"] = login["clientCredentials"]["sessionID"]
m3u8 = SERVICE_CACHE["rivertv"]["playlists"].get(f"{nord_server}-{channel}-{request.remote_addr}-{vbegin}{vend}")
if not m3u8:
stream = session.post(
url="https://a3.cdn.vmedia.ca/api/media/restartStream",
json={
"mediaRequest": {
"IsRestart": 0, # 1 if live, 0 otherwise
"clientSidePlaylist": False,
"item": {
"contentType": 4, # 4 if live, 1 otherwise
"id": channel
},
"startOffset": 0,
"streamSettings": {
"balancingArea": {
"id": -1
},
"qualityPreset": 0,
"shiftTimeZoneName": "NA_EST"
}
},
"sessionID": session_id
}
).json()
playlist = stream["URL"].split("?")[0]
if vbegin and vend:
playlist += f"?{urlencode({'vbegin': vbegin, 'vend': vend})}"
m3u8 = SERVICE_CACHE["rivertv"]["playlists"][f"{nord_server}-{channel}-{request.remote_addr}-{vbegin}{vend}"] = requests.get(
url=f"{PUBLIC_URL}/{auth}/proxy/{nord_server}",
params={"url": playlist}
).text
r = Response(m3u8)
r.headers["Content-Type"] = "application/mpegURL"
return r, 200
@app.route("/<string:auth>/9now/<string:channel>.m3u8", methods=["GET"])
@authenticated
def ninenow(auth: str, channel: str) -> tuple[str, int]:
"""
Convert a 9Now Channel ID to an HLS M3U8 stream.
E.g., /9now/go.m3u8 (9Go 720p)
This bypasses the Geoblock by using the /proxy endpoint on the URIs within the
playlist. A stable proxy will be required.
"""
nord_server = "au602"
session = requests.Session()
session.headers.update({
"Origin": "https://www.9now.com.au",
"Referer": "https://www.9now.com.au/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.63 Safari/537.36",
"Authority": "api.9now.com.au",
"Accept-Language": "en-AU",
"sec-ch-ua": '" Not A;Brand";v="99", "Chromium";v="102", "Google Chrome";v="102"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site"
})
res = session.get(
url="https://api.9now.com.au/live-experience",
params={
"device": "web",
"region": "nsw",
"slug": channel,
"streamParams": "web,chrome,windows"
}
)
if res.status_code != 200:
return res.json(), res.status_code
stream_url = res.json()["data"]["getLXP"]["stream"]["video"]["url"]
r = Response(requests.get(
url=f"{PUBLIC_URL}/{auth}/proxy/{nord_server}",
params={"url": stream_url}
).text)
r.headers["Content-Type"] = "application/mpegURL"
return r, 200
@app.route("/<string:auth>/proxy/<string:server>", methods=["GET"])
@authenticated
def proxy(auth: str, server: str) -> tuple[str, int]:
"""
Download the specified URL using a Proxy.
If the URL is to an m3u(8) document, it will rewrite the URLs within it to also be
proxied with the same server.
This may fail if the specified URL or any URL within the M3U8 is too long.
Possible Server values:
- No Proxy put 'none' (Uses the Server's IP/Network).
- NordVPN Proxy put E.g., 'uk2020'. See https://nordvpn.com/servers/tools/
- Docoja.com/blue UK Web Proxy put 'docoja'.
"""
url = unquote_plus(request.args.get("url") or "")
if not url:
return "The 'url' param must be supplied.", 400
session = requests.Session()
session.headers.update({"User-Agent": request.headers.get("User-Agent")})
if server == "docoja":
session.headers.update({
"Referer": "https://www.docoja.com/blue/"
})
res = session.get(
url="https://www.docoja.com/blue/browse.php",
params={
"u": url,
"b": 4,
"f": "norefer"
}
)
else:
if server != "none":
if not AUTH.get("nordvpn"):
return "Sorry! NordVPN Proxy is disabled, please use another server.", 400
proxy_hostname = f"{server}.nordvpn.com"
if subprocess.call(["ping", ["-c", "-n"][os.name == "nt"], "1", proxy_hostname]) != 0:
return f"Invalid NordVPN Server: '{proxy_hostname}'", 400
session.proxies.update({"all": f"https://{AUTH['nordvpn']}@{proxy_hostname}:89"}) # 80 == HTTP, 89 == HTTP+SSL
res = session.get(url)
if res.content.startswith(b"#EXTM3U"):
m3u8 = res.text.splitlines(keepends=False)
# base inject url
inject_base = f"{PUBLIC_URL}/{auth}/proxy/{server}"
# relative to absolute
m3u8 = [
urljoin(url, line) if line and not line.startswith(("http", "#")) else line
for line in m3u8
]
# use proxied ts url
m3u8 = [
f"{inject_base}?url={quote_plus(line)}" if line and not line.startswith("#") else line
for line in m3u8
]
# rejoin
m3u8 = "\n".join(m3u8)
# use proxied playlist urls
for uri in re.finditer(r',URI="(.+?)"', m3u8):
m3u8 = m3u8.replace(uri.group(0), f',URI="{inject_base}?url={quote_plus(urljoin(url, uri.group(1)))}"')
return Response(
m3u8,
res.status_code,
content_type="application/mpegURL"
)
else:
# assume data stream
return Response(
res.content,
res.status_code,
content_type=res.headers.get("content-type") or "application/octet-stream",
)
@app.route("/<string:auth>/decrypt/<string:service>/<string:channel>/<string:seg_type>/<string:path>", methods=["GET"])
@authenticated
def decrypt(auth: str, service: str, channel: str, seg_type: str, path: str) -> tuple[str, int]:
"""Decrypt MPEG-DASH stream data, pretty hacky method like /proxy but it works, somewhat."""
url = {
"rte": f"https://live.rte.ie/live/a/{channel}/{channel}.isml/dash/{path}",
"channel4": f"https://cdn.live.dash.c4assets.com/v2/iso-dash-mp/{channel}/{path}",
"itv": f"https://{channel}simamobile.cdn1.content.itv.com/playout/mb01/{channel}/cenc.isml/dash/{path}", # android
# browser: "itv": f"https://itv1simadotcom.cdn1.content.itv.com/playout/pc01/{channel}/cenc.isml/dash/{path}"
}[service]
proxy = {
"itv": "docoja"
}.get(service)
if proxy:
url = f"{PUBLIC_URL}/{auth}/proxy/{proxy}?url={quote_plus(url)}"
presentation_id = re.search({
"rte": rf"{channel}-([a-z]*[^-.]*)-?(?:\d+)?",
"channel4": r"\d+(item-\d{2}item)",
"itv": r"cenc-([a-z]*[^-.]*)-?(?:\d+)?"
}[service], path).group(1)
service_key = f"{service}-{channel}"
key_prefix = f"{service_key}-{presentation_id}"
if seg_type == "init":
out = DRM_INIT_CACHE[key_prefix] = requests.get(url).content
else:
out = DRM_SEGMENT_CACHE[service_key].get(path)
if not out:
init = DRM_INIT_CACHE.get(key_prefix)
if not init:
# TODO: Problematic
msg = f"No init data for {path}"
print(msg)
return msg, 500
# decrypt with shaka-packager, sadly cant efficiently pipe it unless your on linux
# even on linux, its just using a special fifo file, not really piping
segment_file_path = (DRM_TEMP_FOLDER / f"{key_prefix}-{path}.mp4")
segment_file_path.write_bytes(init + requests.get(url).content)
subprocess.check_call(
[
"packager-win-x64",
f"input={segment_file_path},stream=0,output={segment_file_path}",
"--enable_raw_key_decryption",
"--keys",
",".join([
f"label={i}:key_id={key[0]}:key={key[1]}"
for i, key in enumerate(DRM_CONTENT_KEYS[service][channel])
])
],
# may potentially give a speed boost
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
out = DRM_SEGMENT_CACHE[service_key][path] = segment_file_path.read_bytes()
segment_file_path.unlink()
# remove the init data, commented out as ive noticed smoother streams by keeping
# moov_start = out.rfind(b"moov") - 4
# moov_length = int.from_bytes(out[moov_start:moov_start + 4], "big")
# out = out[moov_start + moov_length:]
if len(DRM_SEGMENT_CACHE[service_key]) > 50:
DRM_SEGMENT_CACHE[service_key].popitem()
r = Response(out)
r.headers["Content-Type"] = "video/mp4"
return r, 200
@app.after_request
def apply_caching(response):
response.headers.update({
"access-control-allow-origin": "*",
"x-accel-expires": EXPIRES_MAP.get(response.headers.get("content-type").lower(), "0"),
"cache-control": CACHE_CONTROL_MAP.get(response.headers.get("content-type").lower(), "no-cache"),
})
return response
DRM_TEMP_FOLDER.mkdir(parents=True, exist_ok=True)
waitress.serve(app, host="0.0.0.0", port="5954")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment