Skip to content

Instantly share code, notes, and snippets.

@justfoolingaround
Created September 24, 2021 14:36
Show Gist options
  • Save justfoolingaround/6b9857626d862563ff11e0d90f8428f2 to your computer and use it in GitHub Desktop.
Save justfoolingaround/6b9857626d862563ff11e0d90f8428f2 to your computer and use it in GitHub Desktop.
A ffmpeg wrapper for tqdm
"""
A ffmpeg download wrapper for ffmpeg and tqdm.
Wrapper was created in favor of [animdl](https://github.com/justfoolingaround/animdl.git)
and since the developer doesn't care about you copying and pasting this code somewhere,
you may do it. No need for credit. You might feel guilty though.
Hope **your** project becomes easier.
"""
import logging
import os
import re
import shutil
import subprocess
from collections import defaultdict
from tqdm import tqdm
executable = 'ffmpeg'
has_ffmpeg = lambda: bool(shutil.which(executable))
FFMPEG_EXTENSIONS = ['mpd', 'm3u8', 'm3u']
def parse_ffmpeg_duration(dt: str) -> float:
"""
Converts ffmpeg duration to seconds.
Returns
---
`float`
"""
hour, minute, seconds = (float(_) for _ in dt.split(':'))
return hour*(60**2) + minute*60 + seconds
def iter_audio(stderr):
"""
Goes over the audio part of the ffmpeg output and gets the mapping index and
the frequency.
Returns
---
`Generator[tuple(str, int)]`
"""
def it():
"""
A generator, that is made for sorting and sending to another generator.
"""
for match in re.finditer(b'Stream #(\d+):(\d+): Audio:.+ (\d+) Hz', stderr):
program, stream_id, freq = (_.decode() for _ in match.groups())
yield "{}:a:{}".format(program, stream_id), int(freq)
yield from sorted(it(), key=lambda x: x[1], reverse=True)
def analyze_stream(logger: logging.Logger, url: str, headers: dict):
"""
Converts the output of `ffmpeg -i $URL` to a partial stream info default dict.
In logging level DEBUG, it shows the ffmpeg output.
Returns
---
`collections.defaultdict`
"""
info = defaultdict(lambda: defaultdict(lambda: defaultdict(defaultdict)))
args = [executable, '-hide_banner']
if headers:
args.extend(('-headers', '\r\n'.join('{}:{}'.format(k, v) for k, v in headers.items())))
args.extend(('-i', url))
logger.debug('Calling PIPE child process for ffmpeg: {}'.format(args))
process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stderr = b''.join(iter(process.stdout))
duration = re.search(b'Duration: ((?:\d+:)+\d+)', stderr)
if duration:
info['duration'] = parse_ffmpeg_duration(duration.group(1).decode())
audio = [*iter_audio(stderr)]
for match in re.finditer(b'Stream #(\d+):(\d+): Video: .+x(\d+)', stderr):
program, stream_index, resolution = (int(_.decode()) for _ in match.groups())
info['streams'][program][stream_index]['quality'] = resolution
info['streams'][program][stream_index]['audio'] = audio
return info
def iter_quality(quality_dict):
"""
Iterates over the quality dict returned by `analyze_stream`.
"""
for programs, streams in quality_dict.get('streams', {}).items():
for stream, stream_info in streams.items():
yield "{}:v:{}".format(programs, stream), stream_info.get('quality') or 0, (stream_info.get('audio') or [None, 0])[0][0]
def get_last(iterable):
"""
Gets the last element from the iterable. Pretty self-explanatory.
"""
expansion = [*iterable]
if expansion:
return expansion[-1]
def ffmpeg_to_tqdm(logger: logging.Logger, process: subprocess.Popen, duration: int, outfile_name: str) -> subprocess.CompletedProcess:
"""
tqdm wrapper for a ffmpeg process.
Takes a logger `logger`, the ffmpeg child process `process`, duration of stream
`duration` and the output file's name `outfile_name`
This uses the simple concept, stream reading using `iter`, after which it takes
the current time, converts it into seconds and shows the full progress bar.
In logging level DEBUG, it shows the ffmpeg output.
Returns
---
`subprocess.Popen` but completed
"""
progress_bar = tqdm(desc="HLS, FFMPEG / GET {}.mkv".format(outfile_name), total=duration, unit='segment')
previous_span = 0
for stream in process.stdout:
logger.debug('[ffmpeg] {}'.format(stream.decode().strip()))
current = get_last(re.finditer(b'\stime=((?:\d+:)+\d+)', stream))
if current:
in_seconds = parse_ffmpeg_duration(current.group(1).decode()) - previous_span
previous_span += in_seconds
progress_bar.update(in_seconds)
progress_bar.close()
return process
def ffmpeg_download(url: str, headers: dict, outfile_name: str, content_dir, preferred_quality=1080, log_level=20, **opts) -> int:
"""
Downloads content using ffmpeg and optionally uses tqdm to wrap the progress
bar.
Initally, it fetches content information for the stream using `analyze_stream`.
Then after, it selects the quality preferred by the user and maps it to the best
audio. The stream is then passed to tqdm if the logging level is less than INFO.
If the logging level is greater than INFO, it simply runs the command and waits.
In logging level DEBUG, it shows the ffmpeg output.
Returns
---
`int` The ffmpeg child process' return code.
"""
logger = logging.getLogger('ffmpeg-hls-download[{}.mkv]'.format(outfile_name))
logger.debug("Using ffmpeg to download content.")
stream_info = analyze_stream(logger, url, headers)
file = content_dir / ("{}.mkv".format(outfile_name))
try:
os.remove(file)
except:
pass
args = [executable, '-hide_banner']
if headers:
args.extend(('-headers', '\r\n'.join('{}:{}'.format(k, v) for k, v in headers.items())))
args.extend(('-i', url, '-c', 'copy', file.as_posix()))
for video, quality, audio in filter(lambda x: x[1] <= preferred_quality, sorted(iter_quality(stream_info), key=lambda x: x[1], reverse=True)):
if quality < preferred_quality:
logger.warning('Could not find the stream of desired quality {}, currently downloading {}.'.format(preferred_quality, quality or 'an unknown quality'))
child = subprocess.Popen(args + ['-map', video, '-map', audio], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
if log_level > 20:
return child.wait()
return ffmpeg_to_tqdm(logger, child, duration=stream_info.get('duration'), outfile_name=outfile_name).returncode
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment