Skip to content

Instantly share code, notes, and snippets.

@fedej
Created May 4, 2020 00:13
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save fedej/7f848d20205efbff4db4a0fc78eae7ba to your computer and use it in GitHub Desktop.
Save fedej/7f848d20205efbff4db4a0fc78eae7ba to your computer and use it in GitHub Desktop.
asyncio ffmpeg-python
import json
from ffmpeg._run import Error, compile
from ffmpeg._utils import convert_kwargs_to_cmd_line_args
from asyncio import create_subprocess_exec, subprocess, wait_for
async def probe(filename, cmd='ffprobe', timeout=None, **kwargs):
args = [cmd, '-show_format', '-show_streams', '-of', 'json']
args += convert_kwargs_to_cmd_line_args(kwargs)
args += [filename]
p = await create_subprocess_exec(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
coro = wait_for(p.communicate(), timeout) if timeout else p.communicate()
out, err = await coro
if p.returncode != 0:
raise Error('ffprobe', out, err)
return json.loads(out.decode('utf-8'))
async def run(stream_spec,
cmd='ffmpeg',
pipe_stdin=False,
pipe_stdout=False,
pipe_stderr=False,
input=None,
quiet=False,
overwrite_output=False,):
args = compile(stream_spec, cmd, overwrite_output=overwrite_output)
stdin_stream = subprocess.PIPE if pipe_stdin else None
stdout_stream = subprocess.PIPE if pipe_stdout or quiet else None
stderr_stream = subprocess.PIPE if pipe_stderr or quiet else None
p = await create_subprocess_exec(
*args, stdin=stdin_stream, stdout=stdout_stream, stderr=stderr_stream
)
out, err = await p.communicate(input)
if p.returncode != 0:
raise Error('ffmpeg', out, err)
return out, err
__all__ = ['probe', 'run']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment