Skip to content

Instantly share code, notes, and snippets.

@CypherpunkSamurai
Created July 12, 2024 20:47
Show Gist options
  • Save CypherpunkSamurai/ebe8afed44ef9fd53229d66016c6eaea to your computer and use it in GitHub Desktop.
Save CypherpunkSamurai/ebe8afed44ef9fd53229d66016c6eaea to your computer and use it in GitHub Desktop.
Threading to Download M3u8 using Python
# streamwriter.py
import os
import glob
import m3u8
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
from tqdm import tqdm
# fix ssl warnings
import urllib3
urllib3.disable_warnings()
# chunk size
CHUNK_SIZE = 4*1024
class StreamWriter:
"""
StreamWriter to write m3u8 streams to file
"""
# segments
segments: m3u8.SegmentList = None
# chunk size
chunk_size: int = CHUNK_SIZE
def __init__(
self, m3u8_url: str, filepath: str, headers: dict = {}, workers: int = 10
):
"""
Create a stream writer
"""
# m3u8 url
self.m3u8_url = m3u8_url
# filepath
self.filepath = filepath
# headers
self.headers = headers
# workers count
self.workers: int = workers
def _mktmpdir(self):
"""
Create a temporary directory to store .ts files
"""
tmpdir = os.path.join(os.path.dirname(self.filepath), "tmp")
os.makedirs(tmpdir, exist_ok=True)
return tmpdir
def get_segments(self) -> m3u8.SegmentList:
"""
Get list of segments
"""
# check
if not self.segments:
stream = m3u8.load(self.m3u8_url, headers=self.headers, verify_ssl=False)
self.segments = stream.segments
# return
return self.segments
@staticmethod
def _write_segment_to_file(
segment: m3u8.Segment,
filepath: str,
headers: dict = {},
chunk_size: int = CHUNK_SIZE,
):
"""
Write a segment to file
Args:
segment (m3u8.Segment): m3u8 Segment
filepath (str): .ts file path
"""
try:
# get segment
response = requests.get(
segment.absolute_uri, headers=headers, verify=False, stream=True
)
if not response.ok:
response.raise_for_status()
# write to file
with open(filepath, "wb") as file:
for chunk in response.iter_content(chunk_size=chunk_size):
file.write(chunk)
except KeyboardInterrupt:
print("Cancelled...")
exit(1)
def _write_segments(self, result_handler: Callable = None) -> list:
"""
Write Segments to file
"""
# tqdm args
tqdm_args = {}
# results
results = []
try:
with ThreadPoolExecutor(max_workers=self.workers) as executor:
executor_pool = list(
tqdm(
executor.map(
self._write_segment_to_file,
self.segments, # Unpack segments iterable
[
os.path.join(
os.path.dirname(self.filepath),
"tmp",
f"{os.path.basename(self.filepath)}.part.{i}.ts"
)
for i, _ in enumerate(self.segments)
],
[self.headers] * len(self.segments),
),
total=len(self.segments),
unit="segment",
desc=f"[i] --> {self.filepath}",
**tqdm_args,
)
) # Repeat headers list
except KeyboardInterrupt:
print("Cancelled...")
exit(1)
# return results pool
return results
def _merge_segments(self) -> None:
"""
Merge all .ts files into one file
"""
# get all files
files = glob.glob(
os.path.join(
os.path.dirname(self.filepath), "tmp", f"{os.path.basename(self.filepath)}.part.*.ts"
)
)
# sort
files.sort()
# open file
with open(self.filepath, "wb") as file:
# iterate files
for f in files:
with open(f, "rb") as part:
file.write(part.read())
# remove parts
for f in files:
os.remove(f)
def write(self) -> None:
"""
Write Stream to file
"""
# get segments
self.get_segments()
# make tmp dir
self._mktmpdir()
# write segments
self._write_segments()
# merge segments
self._merge_segments()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment