Created
July 12, 2024 20:47
-
-
Save CypherpunkSamurai/ebe8afed44ef9fd53229d66016c6eaea to your computer and use it in GitHub Desktop.
Threading to Download M3u8 using Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# streamwriter.py | |
import os | |
import glob | |
import m3u8 | |
import requests | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from typing import Callable | |
from tqdm import tqdm | |
# fix ssl warnings | |
import urllib3 | |
urllib3.disable_warnings() | |
# chunk size | |
CHUNK_SIZE = 4*1024 | |
class StreamWriter: | |
""" | |
StreamWriter to write m3u8 streams to file | |
""" | |
# segments | |
segments: m3u8.SegmentList = None | |
# chunk size | |
chunk_size: int = CHUNK_SIZE | |
def __init__( | |
self, m3u8_url: str, filepath: str, headers: dict = {}, workers: int = 10 | |
): | |
""" | |
Create a stream writer | |
""" | |
# m3u8 url | |
self.m3u8_url = m3u8_url | |
# filepath | |
self.filepath = filepath | |
# headers | |
self.headers = headers | |
# workers count | |
self.workers: int = workers | |
def _mktmpdir(self): | |
""" | |
Create a temporary directory to store .ts files | |
""" | |
tmpdir = os.path.join(os.path.dirname(self.filepath), "tmp") | |
os.makedirs(tmpdir, exist_ok=True) | |
return tmpdir | |
def get_segments(self) -> m3u8.SegmentList: | |
""" | |
Get list of segments | |
""" | |
# check | |
if not self.segments: | |
stream = m3u8.load(self.m3u8_url, headers=self.headers, verify_ssl=False) | |
self.segments = stream.segments | |
# return | |
return self.segments | |
@staticmethod | |
def _write_segment_to_file( | |
segment: m3u8.Segment, | |
filepath: str, | |
headers: dict = {}, | |
chunk_size: int = CHUNK_SIZE, | |
): | |
""" | |
Write a segment to file | |
Args: | |
segment (m3u8.Segment): m3u8 Segment | |
filepath (str): .ts file path | |
""" | |
try: | |
# get segment | |
response = requests.get( | |
segment.absolute_uri, headers=headers, verify=False, stream=True | |
) | |
if not response.ok: | |
response.raise_for_status() | |
# write to file | |
with open(filepath, "wb") as file: | |
for chunk in response.iter_content(chunk_size=chunk_size): | |
file.write(chunk) | |
except KeyboardInterrupt: | |
print("Cancelled...") | |
exit(1) | |
def _write_segments(self, result_handler: Callable = None) -> list: | |
""" | |
Write Segments to file | |
""" | |
# tqdm args | |
tqdm_args = {} | |
# results | |
results = [] | |
try: | |
with ThreadPoolExecutor(max_workers=self.workers) as executor: | |
executor_pool = list( | |
tqdm( | |
executor.map( | |
self._write_segment_to_file, | |
self.segments, # Unpack segments iterable | |
[ | |
os.path.join( | |
os.path.dirname(self.filepath), | |
"tmp", | |
f"{os.path.basename(self.filepath)}.part.{i}.ts" | |
) | |
for i, _ in enumerate(self.segments) | |
], | |
[self.headers] * len(self.segments), | |
), | |
total=len(self.segments), | |
unit="segment", | |
desc=f"[i] --> {self.filepath}", | |
**tqdm_args, | |
) | |
) # Repeat headers list | |
except KeyboardInterrupt: | |
print("Cancelled...") | |
exit(1) | |
# return results pool | |
return results | |
def _merge_segments(self) -> None: | |
""" | |
Merge all .ts files into one file | |
""" | |
# get all files | |
files = glob.glob( | |
os.path.join( | |
os.path.dirname(self.filepath), "tmp", f"{os.path.basename(self.filepath)}.part.*.ts" | |
) | |
) | |
# sort | |
files.sort() | |
# open file | |
with open(self.filepath, "wb") as file: | |
# iterate files | |
for f in files: | |
with open(f, "rb") as part: | |
file.write(part.read()) | |
# remove parts | |
for f in files: | |
os.remove(f) | |
def write(self) -> None: | |
""" | |
Write Stream to file | |
""" | |
# get segments | |
self.get_segments() | |
# make tmp dir | |
self._mktmpdir() | |
# write segments | |
self._write_segments() | |
# merge segments | |
self._merge_segments() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment