Skip to content

Instantly share code, notes, and snippets.

@ALenfant
Created September 2, 2021 20:35
Show Gist options
  • Save ALenfant/94632330d2736da859a220e95ef0ed4b to your computer and use it in GitHub Desktop.
Save ALenfant/94632330d2736da859a220e95ef0ed4b to your computer and use it in GitHub Desktop.
Fetch m3u8 playlist, download individual ts segment files, concatenate and convert to mp4
import m3u8
from multiprocessing.pool import ThreadPool
import os
import requests
import subprocess
url = "http://ENTER_URL_HERE.m3u8"
def file_name_from_url(url):
# assumes that the last segment after the / represents the file name
# if url is abc/xyz/file.txt, the file name will be file.txt
file_name_start_pos = url.rfind("/") + 1
return url[file_name_start_pos:]
def download_url(url):
print("Downloading: ", url)
file_name = file_name_from_url(url)
r = requests.get(url, stream=True)
if r.status_code == requests.codes.ok:
with open(file_name, 'wb') as f:
for data in r:
f.write(data)
return file_name
if __name__ == '__main__':
playlist = m3u8.load(url)
# Select best resolution
if playlist.playlists:
print("Available resolutions: ", end="")
print(", ".join([str(p.stream_info.resolution) for p in playlist.playlists]))
best_resolution_playlist = None
for sub_playlist in playlist.playlists:
if not best_resolution_playlist:
best_resolution_playlist = sub_playlist
else:
if sub_playlist.stream_info.resolution > best_resolution_playlist.stream_info.resolution:
best_resolution_playlist = sub_playlist
print(f"Selected {best_resolution_playlist.stream_info.resolution}")
playlist = m3u8.load(best_resolution_playlist.absolute_uri)
if not playlist.segments:
raise Exception("No segments to download!")
print(f"{len(playlist.segments)} segments to fetch...")
urls = [s.absolute_uri for s in playlist.segments]
# Run 5 multiple threads. Each call will take the next element in urls list
results = list(ThreadPool(5).imap_unordered(download_url, urls))
for result in results:
pass # Wait
print("Concatenating...")
file_name = file_name_from_url(url) + ".ts"
with open(file_name, 'wb') as f:
for segment in playlist.segments: # Do it in order
sub_file_name = file_name_from_url(segment.absolute_uri)
with open(sub_file_name, "rb") as sub_f:
f.write(sub_f.read())
os.remove(sub_file_name)
print(f"Done creating {file_name}")
print("Converting...")
final_file_name = file_name + ".mp4"
command = f'ffmpeg -i "{file_name}" -acodec copy -vcodec copy "{final_file_name}"'
subprocess.call(command, shell=True)
print(f"Done! See {final_file_name}")
os.remove(file_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment