To detect and download videos from a webpage's network traffic, we can intercept network requests in Playwright, identify video streams/files, and handle them appropriately. Below are different strategies to achieve this.
Monitor network activity for video-related MIME types or file extensions.
β
Detects videos loaded via XHR, fetch, or <video>
tags
β
Handles dynamic streaming (e.g., HLS, DASH)
β
Works with direct file downloads
from playwright.sync_api import sync_playwright
import os
def detect_and_download_videos(url, download_dir="videos"):
os.makedirs(download_dir, exist_ok=True)
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
context = browser.new_context()
page = context.new_page()
# Enable request interception
page.route("**/*", lambda route: handle_route(route, download_dir))
page.goto(url)
page.wait_for_timeout(5000) # Wait for videos to load
browser.close()
def handle_route(route, download_dir):
request = route.request
url = request.url
# Check if the request is for a video file
if is_video_request(request):
print(f"Found video: {url}")
route.continue_() # Let the request complete
# Option 1: Download via Playwright
download_video(url, download_dir)
# Option 2: Manually fetch via `requests` (if auth/cookies needed)
else:
route.continue_() # Skip non-video requests
def is_video_request(request):
url = request.url.lower()
content_type = request.headers.get("content-type", "").lower()
# Common video extensions & MIME types
video_extensions = [".mp4", ".webm", ".mov", ".avi", ".mkv", ".m3u8"]
video_mimes = ["video/mp4", "video/webm", "application/x-mpegurl", "video/quicktime"]
return (any(ext in url for ext in video_extensions) or
any(mime in content_type for mime in video_mimes))
def download_video(url, download_dir):
import requests
try:
response = requests.get(url, stream=True)
filename = os.path.join(download_dir, url.split("/")[-1].split("?")[0])
with open(filename, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"Downloaded: {filename}")
except Exception as e:
print(f"Failed to download {url}: {e}")
if __name__ == "__main__":
detect_and_download_videos("https://example.com/video-page")
If videos are embedded via <video>
tags, extract src
or blob:
URLs.
def extract_video_srcs(url):
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
page.goto(url)
# Find all <video> elements
video_elements = page.query_selector_all("video")
video_urls = []
for video in video_elements:
src = video.get_attribute("src")
if src and src.startswith(("http", "blob:")):
video_urls.append(src)
print("Found video sources:", video_urls)
browser.close()
return video_urls
For adaptive streaming (e.g., .m3u8
playlists), use requests
+ m3u8
parser.
import m3u8
import subprocess # For FFmpeg
def download_hls_stream(m3u8_url, output_file="output.mp4"):
m3u8_obj = m3u8.load(m3u8_url)
ts_urls = [segment.absolute_uri for segment in m3u8_obj.segments]
# Download all .ts segments
for i, ts_url in enumerate(ts_urls):
subprocess.run(f"ffmpeg -i {ts_url} -c copy {output_file}", shell=True)
print(f"Stream saved to {output_file}")
If videos are loaded as blob:
, extract via JavaScript.
def extract_blob_video(page):
blob_url = page.evaluate("""
() => {
const video = document.querySelector("video");
return video?.src.startsWith("blob:") ? video.src : null;
}
""")
if blob_url:
print("Found blob URL:", blob_url)
# Use `page.evaluate_handle` to capture Blob data (advanced)
β Rate Limiting: Donβt spam requests (use delays).
β Authentication: Handle cookies/headers if needed.
β Dynamic Sites: Use page.wait_for_selector
for SPAs.
β Legal Compliance: Respect robots.txt
and copyright laws.
Beyond the previous four strategies, here are additional advanced techniques to detect and download videos from network traffic, including WebSocket streams, encrypted media (DRM), dynamic payloads, and more.
Some websites (e.g., live streams) transmit video chunks via WebSocket instead of HTTP.
def capture_websocket_video(url):
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
video_packets = []
def on_websocket(ws):
if "video" in ws.url.lower() or ws.url.endswith((".ts", ".m4s")):
ws.on("framereceived", lambda data: video_packets.append(data))
page.on("websocket", on_websocket)
page.goto(url)
page.wait_for_timeout(10000) # Wait for WebSocket activity
if video_packets:
with open("websocket_video.bin", "wb") as f:
for packet in video_packets:
f.write(packet)
print("WebSocket video captured!")
browser.close()
Some videos (Netflix, Hulu) use DRM-protected streams. Playwright can detect license requests.
def detect_drm_license_requests(url):
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
def handle_request(request):
if "license" in request.url.lower() or "widevine" in request.url.lower():
print(f"DRM License Request: {request.url}")
# Can intercept/modify license requests (advanced)
page.on("request", handle_request)
page.goto(url)
page.wait_for_timeout(10000)
browser.close()
Note:
- Decrypting DRM videos is illegal without authorization.
- Use this only for debugging/analysis.
Some sites load video metadata via API calls (e.g., TikTok, Instagram Reels).
def detect_api_video_links(url):
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
video_urls = []
def handle_response(response):
if response.url.endswith((".json", "/api/v1/video")) and "video" in response.url.lower():
json_data = response.json()
# Parse JSON to find video URLs (site-specific)
if "video_url" in json_data:
video_urls.append(json_data["video_url"])
page.on("response", handle_response)
page.goto(url)
page.wait_for_timeout(5000)
print("Detected API video URLs:", video_urls)
browser.close()
If a video is fully buffered, we can extract it from browser memory.
def dump_video_from_memory(url):
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
page.goto(url)
# Wait for video to load
page.wait_for_selector("video")
# Extract video as a Blob via JavaScript
video_data = page.evaluate("""
async () => {
const video = document.querySelector("video");
const blob = await fetch(video.src).then(r => r.blob());
return URL.createObjectURL(blob);
}
""")
# Download the Blob URL
if video_data:
print(f"Blob URL extracted: {video_data}")
# Use `requests` or Playwright to download
browser.close()
For hard-to-detect streams, use Mitmproxy alongside Playwright.
-
Install Mitmproxy:
pip install mitmproxy
-
Run a proxy server and log all video requests.
from playwright.sync_api import sync_playwright
import subprocess
def start_mitmproxy():
subprocess.Popen(["mitmproxy", "-s", "mitmproxy_script.py"])
def capture_with_proxy(url):
with sync_playwright() as p:
browser = p.chromium.launch(
headless=False,
proxy={"server": "http://127.0.0.1:8080"}
)
page = browser.new_page()
page.goto(url)
page.wait_for_timeout(10000)
browser.close()
if __name__ == "__main__":
start_mitmproxy()
capture_with_proxy("https://example.com")
mitmproxy_script.py
:
def response(flow):
if "video" in flow.request.url or flow.response.headers.get("content-type", "").startswith("video/"):
print(f"Video found: {flow.request.url}")
with open("video_dump.bin", "wb") as f:
f.write(flow.response.content)
Some sites use WebGL/Canvas to render video. We can hook into GPU calls (experimental).
def detect_gpu_video_rendering(url):
with sync_playwright() as p:
browser = p.chromium.launch(headless=False)
page = browser.new_page()
# Enable Chrome DevTools Protocol (CDP)
cdp = page.context.new_cdp_session(page)
cdp.send("Performance.enable")
def on_performance_metrics(event):
if "video" in event.get("name", "").lower():
print("GPU/Decoder Activity Detected!")
cdp.on("Performance.metrics", on_performance_metrics)
page.goto(url)
page.wait_for_timeout(10000)
browser.close()
Strategy | Best For | Limitations |
---|---|---|
1. Network Interception | Direct MP4/WebM downloads | Misses WebSocket/DRM |
2. <video> DOM Parsing |
Embedded videos | Fails if src is dynamic |
3. HLS/DASH Streaming | M3U8/MPD playlists | Needs FFmpeg for merging |
4. Blob URL Extraction | Dynamic buffer videos | Requires JS execution |
5. WebSocket Capture | Live streams (e.g., WebRTC) | Complex reassembly |
6. DRM License Detection | Encrypted streams (Netflix, Hulu) | No decryption (legal issues) |
7. API JSON Parsing | Social media (TikTok, Instagram) | Site-specific parsing |
8. Memory Dump | Fully buffered videos | Heavy resource usage |
9. Mitmproxy Interception | All traffic (including non-HTTP) | Requires proxy setup |
10. GPU/Decoder Hooking | Canvas/WebGL-rendered video | Experimental, complex |
- Start with Network Interception (Strategy 1).
- Check
<video>
elements (Strategy 2). - Look for API responses (Strategy 7).
- Capture WebSocket streams (Strategy 5).
- Fall back to Mitmproxy (Strategy 9).
This script combines all 10 strategies to detect and download videos from any website by analyzing network traffic, DOM, APIs, WebSockets, and more.
import os
import requests
from playwright.sync_api import sync_playwright
import m3u8
import subprocess
from urllib.parse import urlparse
# Config
DOWNLOAD_DIR = "downloaded_videos"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
def download_file(url, filename=None):
try:
if not filename:
filename = os.path.join(DOWNLOAD_DIR, url.split("/")[-1].split("?")[0])
print(f"π₯ Downloading: {url}")
response = requests.get(url, stream=True)
with open(filename, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"β
Saved: {filename}")
return True
except Exception as e:
print(f"β Failed to download {url}: {e}")
return False
def universal_video_downloader(target_url):
with sync_playwright() as p:
# Launch browser with proxy if needed (for Mitmproxy)
browser = p.chromium.launch(headless=False)
context = browser.new_context(ignore_https_errors=True)
page = context.new_page()
# Strategy 1: Network Interception
video_urls = set()
def handle_route(route):
request = route.request
url = request.url
content_type = request.headers.get("content-type", "").lower()
# Detect video requests
video_exts = [".mp4", ".webm", ".mov", ".m3u8", ".ts", ".m4s"]
video_mimes = ["video/mp4", "video/webm", "application/x-mpegurl"]
if (any(ext in url.lower() for ext in video_exts) or \
any(mime in content_type for mime in video_mimes):
video_urls.add(url)
print(f"π Found video (Network): {url}")
route.continue_()
# Strategy 5: WebSocket Capture
ws_video_packets = []
def handle_websocket(ws):
if "video" in ws.url.lower() or any(x in ws.url for x in [".ts", ".m4s"]):
ws.on("framereceived", lambda data: ws_video_packets.append(data))
print(f"π Found WebSocket video stream: {ws.url}")
page.route("**/*", handle_route)
page.on("websocket", handle_websocket)
# Load page
page.goto(target_url, timeout=60000)
page.wait_for_timeout(10000) # Wait for videos to load
# Strategy 2: Check <video> elements
video_elements = page.query_selector_all("video")
for video in video_elements:
src = video.get_attribute("src")
if src and (src.startswith("http") or src.startswith("blob:")):
video_urls.add(src)
print(f"π Found <video> src: {src}")
# Strategy 7: Check API responses
api_video_urls = set()
def handle_response(response):
url = response.url
if (url.endswith(".json") or "/api/" in url) and response.status == 200:
try:
json_data = response.json()
# Generic check for video URLs in JSON
if isinstance(json_data, dict):
for key, value in json_data.items():
if isinstance(value, str) and any(x in value for x in [".mp4", ".m3u8"]):
api_video_urls.add(value)
print(f"π Found API video URL: {value}")
except:
pass
page.on("response", handle_response)
# Wait for more dynamic content
page.wait_for_timeout(5000)
# Strategy 4: Blob URL extraction
blob_urls = set()
blob_url = page.evaluate("""
() => {
const video = document.querySelector("video");
return video?.src.startsWith("blob:") ? video.src : null;
}
""")
if blob_url:
blob_urls.add(blob_url)
print(f"π Found Blob URL: {blob_url}")
# Strategy 3: HLS/DASH detection
hls_urls = [url for url in video_urls if url.endswith(".m3u8")]
for hls_url in hls_urls:
print(f"π Found HLS stream: {hls_url}")
output_file = os.path.join(DOWNLOAD_DIR, "hls_output.mp4")
subprocess.run(f"ffmpeg -i {hls_url} -c copy {output_file}", shell=True)
print(f"β
Merged HLS stream: {output_file}")
# Strategy 6: DRM detection (info only)
drm_urls = [url for url in video_urls if "widevine" in url.lower()]
if drm_urls:
print(f"β οΈ DRM-protected streams detected (cannot download): {drm_urls}")
# Strategy 8: Memory dump (experimental)
if blob_urls:
print("β οΈ Blob URLs require manual handling (see Strategy 4)")
# Strategy 9: Mitmproxy (external setup needed)
print("βΉοΈ For Mitmproxy interception, run separately (see Strategy 9)")
# Strategy 10: GPU/Decoder (experimental)
print("βΉοΈ GPU/Decoder detection requires CDP (see Strategy 10)")
# Download all detected videos
for url in video_urls.union(api_video_urls):
if not url.startswith("blob:"):
download_file(url)
browser.close()
if __name__ == "__main__":
video_url = input("Enter video URL: ")
universal_video_downloader(video_url)
- Launches Playwright and monitors:
- Network requests (MP4, HLS, DASH)
<video>
tags (includingblob:
URLs)- WebSocket streams
- API responses (JSON)
- Downloads detected videos (skipping DRM-protected ones).
- Merges HLS streams using FFmpeg.
-
Install dependencies:
pip install playwright requests m3u8 playwright install
-
For HLS/DASH, install FFmpeg:
- macOS:
brew install ffmpeg
- Linux:
sudo apt install ffmpeg
- Windows: Download from ffmpeg.org
- macOS:
-
Run:
python video_downloader.py
(Enter URL when prompted.)
β DRM-protected videos (Netflix, Disney+, etc.) cannot be downloaded.
β Blob URLs require manual handling (Strategy 4).
β WebSocket streams may need reassembly.