Skip to content

Instantly share code, notes, and snippets.

@ayoubzulfiqar
Created May 6, 2025 17:22
Show Gist options
  • Save ayoubzulfiqar/869468e61027e2485a95ca090b6ace3f to your computer and use it in GitHub Desktop.
Save ayoubzulfiqar/869468e61027e2485a95ca090b6ace3f to your computer and use it in GitHub Desktop.
Video Detection & Download Strategies using Playwright (Python)

Video Detection & Download Strategies using Playwright (Python)

To detect and download videos from a webpage's network traffic, we can intercept network requests in Playwright, identify video streams/files, and handle them appropriately. Below are different strategies to achieve this.


1. Intercepting Network Requests (Recommended)

Monitor network activity for video-related MIME types or file extensions.

Key Features:

βœ… Detects videos loaded via XHR, fetch, or <video> tags
βœ… Handles dynamic streaming (e.g., HLS, DASH)
βœ… Works with direct file downloads

Implementation:

from playwright.sync_api import sync_playwright
import os

def detect_and_download_videos(url, download_dir="videos"):
    os.makedirs(download_dir, exist_ok=True)
    
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)
        context = browser.new_context()
        page = context.new_page()

        # Enable request interception
        page.route("**/*", lambda route: handle_route(route, download_dir))
        
        page.goto(url)
        page.wait_for_timeout(5000)  # Wait for videos to load
        browser.close()

def handle_route(route, download_dir):
    request = route.request
    url = request.url

    # Check if the request is for a video file
    if is_video_request(request):
        print(f"Found video: {url}")
        route.continue_()  # Let the request complete
        # Option 1: Download via Playwright
        download_video(url, download_dir)
        # Option 2: Manually fetch via `requests` (if auth/cookies needed)
    else:
        route.continue_()  # Skip non-video requests

def is_video_request(request):
    url = request.url.lower()
    content_type = request.headers.get("content-type", "").lower()
    
    # Common video extensions & MIME types
    video_extensions = [".mp4", ".webm", ".mov", ".avi", ".mkv", ".m3u8"]
    video_mimes = ["video/mp4", "video/webm", "application/x-mpegurl", "video/quicktime"]
    
    return (any(ext in url for ext in video_extensions) or
            any(mime in content_type for mime in video_mimes))

def download_video(url, download_dir):
    import requests
    try:
        response = requests.get(url, stream=True)
        filename = os.path.join(download_dir, url.split("/")[-1].split("?")[0])
        
        with open(filename, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
        print(f"Downloaded: {filename}")
    except Exception as e:
        print(f"Failed to download {url}: {e}")

if __name__ == "__main__":
    detect_and_download_videos("https://example.com/video-page")

2. Monitoring <video> Elements (DOM-Based Detection)

If videos are embedded via <video> tags, extract src or blob: URLs.

Implementation:

def extract_video_srcs(url):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)
        page = browser.new_page()
        page.goto(url)
        
        # Find all <video> elements
        video_elements = page.query_selector_all("video")
        video_urls = []
        
        for video in video_elements:
            src = video.get_attribute("src")
            if src and src.startswith(("http", "blob:")):
                video_urls.append(src)
        
        print("Found video sources:", video_urls)
        browser.close()
        return video_urls

3. Handling Streaming (HLS/DASH)

For adaptive streaming (e.g., .m3u8 playlists), use requests + m3u8 parser.

Implementation:

import m3u8
import subprocess  # For FFmpeg

def download_hls_stream(m3u8_url, output_file="output.mp4"):
    m3u8_obj = m3u8.load(m3u8_url)
    ts_urls = [segment.absolute_uri for segment in m3u8_obj.segments]
    
    # Download all .ts segments
    for i, ts_url in enumerate(ts_urls):
        subprocess.run(f"ffmpeg -i {ts_url} -c copy {output_file}", shell=True)
    
    print(f"Stream saved to {output_file}")

4. Handling Blob URLs (Dynamic Video Buffers)

If videos are loaded as blob:, extract via JavaScript.

Implementation:

def extract_blob_video(page):
    blob_url = page.evaluate("""
        () => {
            const video = document.querySelector("video");
            return video?.src.startsWith("blob:") ? video.src : null;
        }
    """)
    
    if blob_url:
        print("Found blob URL:", blob_url)
        # Use `page.evaluate_handle` to capture Blob data (advanced)

Best Practices & Considerations

βœ” Rate Limiting: Don’t spam requests (use delays).
βœ” Authentication: Handle cookies/headers if needed.
βœ” Dynamic Sites: Use page.wait_for_selector for SPAs.
βœ” Legal Compliance: Respect robots.txt and copyright laws.


Advanced Video Detection & Download Strategies in Playwright (Python)

Beyond the previous four strategies, here are additional advanced techniques to detect and download videos from network traffic, including WebSocket streams, encrypted media (DRM), dynamic payloads, and more.


5. WebSocket Video Stream Detection

Some websites (e.g., live streams) transmit video chunks via WebSocket instead of HTTP.

Implementation:

def capture_websocket_video(url):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)
        page = browser.new_page()

        video_packets = []
        
        def on_websocket(ws):
            if "video" in ws.url.lower() or ws.url.endswith((".ts", ".m4s")):
                ws.on("framereceived", lambda data: video_packets.append(data))
        
        page.on("websocket", on_websocket)
        page.goto(url)
        page.wait_for_timeout(10000)  # Wait for WebSocket activity
        
        if video_packets:
            with open("websocket_video.bin", "wb") as f:
                for packet in video_packets:
                    f.write(packet)
            print("WebSocket video captured!")
        
        browser.close()

6. Detecting Encrypted Media (DRM - Widevine, PlayReady, FairPlay)

Some videos (Netflix, Hulu) use DRM-protected streams. Playwright can detect license requests.

Implementation:

def detect_drm_license_requests(url):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)
        page = browser.new_page()

        def handle_request(request):
            if "license" in request.url.lower() or "widevine" in request.url.lower():
                print(f"DRM License Request: {request.url}")
                # Can intercept/modify license requests (advanced)
        
        page.on("request", handle_request)
        page.goto(url)
        page.wait_for_timeout(10000)
        browser.close()

Note:

  • Decrypting DRM videos is illegal without authorization.
  • Use this only for debugging/analysis.

7. Dynamic Payload Detection (JSON/API Video Feeds)

Some sites load video metadata via API calls (e.g., TikTok, Instagram Reels).

Implementation:

def detect_api_video_links(url):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)
        page = browser.new_page()

        video_urls = []
        
        def handle_response(response):
            if response.url.endswith((".json", "/api/v1/video")) and "video" in response.url.lower():
                json_data = response.json()
                # Parse JSON to find video URLs (site-specific)
                if "video_url" in json_data:
                    video_urls.append(json_data["video_url"])
        
        page.on("response", handle_response)
        page.goto(url)
        page.wait_for_timeout(5000)
        
        print("Detected API video URLs:", video_urls)
        browser.close()

8. Browser Memory Dump (For Blob/Streaming Recovery)

If a video is fully buffered, we can extract it from browser memory.

Implementation:

def dump_video_from_memory(url):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)
        page = browser.new_page()
        page.goto(url)
        
        # Wait for video to load
        page.wait_for_selector("video")
        
        # Extract video as a Blob via JavaScript
        video_data = page.evaluate("""
            async () => {
                const video = document.querySelector("video");
                const blob = await fetch(video.src).then(r => r.blob());
                return URL.createObjectURL(blob);
            }
        """)
        
        # Download the Blob URL
        if video_data:
            print(f"Blob URL extracted: {video_data}")
            # Use `requests` or Playwright to download
        
        browser.close()

9. Proxy-Based Traffic Interception (Mitmproxy + Playwright)

For hard-to-detect streams, use Mitmproxy alongside Playwright.

Setup:

  1. Install Mitmproxy:

    pip install mitmproxy
  2. Run a proxy server and log all video requests.

Implementation:

from playwright.sync_api import sync_playwright
import subprocess

def start_mitmproxy():
    subprocess.Popen(["mitmproxy", "-s", "mitmproxy_script.py"])

def capture_with_proxy(url):
    with sync_playwright() as p:
        browser = p.chromium.launch(
            headless=False,
            proxy={"server": "http://127.0.0.1:8080"}
        )
        page = browser.new_page()
        page.goto(url)
        page.wait_for_timeout(10000)
        browser.close()

if __name__ == "__main__":
    start_mitmproxy()
    capture_with_proxy("https://example.com")

mitmproxy_script.py:

def response(flow):
    if "video" in flow.request.url or flow.response.headers.get("content-type", "").startswith("video/"):
        print(f"Video found: {flow.request.url}")
        with open("video_dump.bin", "wb") as f:
            f.write(flow.response.content)

10. GPU/Decoder-Level Detection (Advanced)

Some sites use WebGL/Canvas to render video. We can hook into GPU calls (experimental).

Implementation (Requires Chrome DevTools Protocol):

def detect_gpu_video_rendering(url):
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=False)
        page = browser.new_page()
        
        # Enable Chrome DevTools Protocol (CDP)
        cdp = page.context.new_cdp_session(page)
        cdp.send("Performance.enable")
        
        def on_performance_metrics(event):
            if "video" in event.get("name", "").lower():
                print("GPU/Decoder Activity Detected!")
        
        cdp.on("Performance.metrics", on_performance_metrics)
        page.goto(url)
        page.wait_for_timeout(10000)
        browser.close()

Comparison of All 10 Strategies

Strategy Best For Limitations
1. Network Interception Direct MP4/WebM downloads Misses WebSocket/DRM
2. <video> DOM Parsing Embedded videos Fails if src is dynamic
3. HLS/DASH Streaming M3U8/MPD playlists Needs FFmpeg for merging
4. Blob URL Extraction Dynamic buffer videos Requires JS execution
5. WebSocket Capture Live streams (e.g., WebRTC) Complex reassembly
6. DRM License Detection Encrypted streams (Netflix, Hulu) No decryption (legal issues)
7. API JSON Parsing Social media (TikTok, Instagram) Site-specific parsing
8. Memory Dump Fully buffered videos Heavy resource usage
9. Mitmproxy Interception All traffic (including non-HTTP) Requires proxy setup
10. GPU/Decoder Hooking Canvas/WebGL-rendered video Experimental, complex

Final Workflow for Maximum Coverage

  1. Start with Network Interception (Strategy 1).
  2. Check <video> elements (Strategy 2).
  3. Look for API responses (Strategy 7).
  4. Capture WebSocket streams (Strategy 5).
  5. Fall back to Mitmproxy (Strategy 9).

Universal Video Downloader using Playwright (Python)

This script combines all 10 strategies to detect and download videos from any website by analyzing network traffic, DOM, APIs, WebSockets, and more.


Full Implementation

import os
import requests
from playwright.sync_api import sync_playwright
import m3u8
import subprocess
from urllib.parse import urlparse

# Config
DOWNLOAD_DIR = "downloaded_videos"
os.makedirs(DOWNLOAD_DIR, exist_ok=True)

def download_file(url, filename=None):
    try:
        if not filename:
            filename = os.path.join(DOWNLOAD_DIR, url.split("/")[-1].split("?")[0])
        
        print(f"πŸ“₯ Downloading: {url}")
        response = requests.get(url, stream=True)
        with open(filename, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"βœ… Saved: {filename}")
        return True
    except Exception as e:
        print(f"❌ Failed to download {url}: {e}")
        return False

def universal_video_downloader(target_url):
    with sync_playwright() as p:
        # Launch browser with proxy if needed (for Mitmproxy)
        browser = p.chromium.launch(headless=False)
        context = browser.new_context(ignore_https_errors=True)
        page = context.new_page()

        # Strategy 1: Network Interception
        video_urls = set()

        def handle_route(route):
            request = route.request
            url = request.url
            content_type = request.headers.get("content-type", "").lower()

            # Detect video requests
            video_exts = [".mp4", ".webm", ".mov", ".m3u8", ".ts", ".m4s"]
            video_mimes = ["video/mp4", "video/webm", "application/x-mpegurl"]

            if (any(ext in url.lower() for ext in video_exts) or \
               any(mime in content_type for mime in video_mimes):
                video_urls.add(url)
                print(f"πŸ” Found video (Network): {url}")
            
            route.continue_()

        # Strategy 5: WebSocket Capture
        ws_video_packets = []

        def handle_websocket(ws):
            if "video" in ws.url.lower() or any(x in ws.url for x in [".ts", ".m4s"]):
                ws.on("framereceived", lambda data: ws_video_packets.append(data))
                print(f"πŸ” Found WebSocket video stream: {ws.url}")

        page.route("**/*", handle_route)
        page.on("websocket", handle_websocket)

        # Load page
        page.goto(target_url, timeout=60000)
        page.wait_for_timeout(10000)  # Wait for videos to load

        # Strategy 2: Check <video> elements
        video_elements = page.query_selector_all("video")
        for video in video_elements:
            src = video.get_attribute("src")
            if src and (src.startswith("http") or src.startswith("blob:")):
                video_urls.add(src)
                print(f"πŸ” Found <video> src: {src}")

        # Strategy 7: Check API responses
        api_video_urls = set()

        def handle_response(response):
            url = response.url
            if (url.endswith(".json") or "/api/" in url) and response.status == 200:
                try:
                    json_data = response.json()
                    # Generic check for video URLs in JSON
                    if isinstance(json_data, dict):
                        for key, value in json_data.items():
                            if isinstance(value, str) and any(x in value for x in [".mp4", ".m3u8"]):
                                api_video_urls.add(value)
                                print(f"πŸ” Found API video URL: {value}")
                except:
                    pass

        page.on("response", handle_response)

        # Wait for more dynamic content
        page.wait_for_timeout(5000)

        # Strategy 4: Blob URL extraction
        blob_urls = set()

        blob_url = page.evaluate("""
            () => {
                const video = document.querySelector("video");
                return video?.src.startsWith("blob:") ? video.src : null;
            }
        """)
        if blob_url:
            blob_urls.add(blob_url)
            print(f"πŸ” Found Blob URL: {blob_url}")

        # Strategy 3: HLS/DASH detection
        hls_urls = [url for url in video_urls if url.endswith(".m3u8")]
        for hls_url in hls_urls:
            print(f"πŸ” Found HLS stream: {hls_url}")
            output_file = os.path.join(DOWNLOAD_DIR, "hls_output.mp4")
            subprocess.run(f"ffmpeg -i {hls_url} -c copy {output_file}", shell=True)
            print(f"βœ… Merged HLS stream: {output_file}")

        # Strategy 6: DRM detection (info only)
        drm_urls = [url for url in video_urls if "widevine" in url.lower()]
        if drm_urls:
            print(f"⚠️ DRM-protected streams detected (cannot download): {drm_urls}")

        # Strategy 8: Memory dump (experimental)
        if blob_urls:
            print("⚠️ Blob URLs require manual handling (see Strategy 4)")

        # Strategy 9: Mitmproxy (external setup needed)
        print("ℹ️ For Mitmproxy interception, run separately (see Strategy 9)")

        # Strategy 10: GPU/Decoder (experimental)
        print("ℹ️ GPU/Decoder detection requires CDP (see Strategy 10)")

        # Download all detected videos
        for url in video_urls.union(api_video_urls):
            if not url.startswith("blob:"):
                download_file(url)

        browser.close()

if __name__ == "__main__":
    video_url = input("Enter video URL: ")
    universal_video_downloader(video_url)

How It Works

  1. Launches Playwright and monitors:
    • Network requests (MP4, HLS, DASH)
    • <video> tags (including blob: URLs)
    • WebSocket streams
    • API responses (JSON)
  2. Downloads detected videos (skipping DRM-protected ones).
  3. Merges HLS streams using FFmpeg.

Usage

  1. Install dependencies:

    pip install playwright requests m3u8
    playwright install
  2. For HLS/DASH, install FFmpeg:

    • macOS: brew install ffmpeg
    • Linux: sudo apt install ffmpeg
    • Windows: Download from ffmpeg.org
  3. Run:

    python video_downloader.py

    (Enter URL when prompted.)


Limitations

❌ DRM-protected videos (Netflix, Disney+, etc.) cannot be downloaded.
❌ Blob URLs require manual handling (Strategy 4).
❌ WebSocket streams may need reassembly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment