Skip to content

Instantly share code, notes, and snippets.

@mithro
Created September 19, 2023 05:36
Show Gist options
  • Save mithro/27a41ecea8e26a4e6a97e331bf7d4d15 to your computer and use it in GitHub Desktop.
Save mithro/27a41ecea8e26a4e6a97e331bf7d4d15 to your computer and use it in GitHub Desktop.
Use gstreamer to decode and then re-encode a file with the same format
#!/usr/bin/env python3
# flake8: noqa
import pathlib
import sys
import traceback
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GstPbutils, GObject # noqa
def on_message(bus: Gst.Bus, message: Gst.Message, loop: GObject.MainLoop):
mtype = message.type
"""
Gstreamer Message Types and how to parse
https://lazka.github.io/pgi-docs/Gst-1.0/flags.html#Gst.MessageType
"""
if mtype == Gst.MessageType.EOS:
print("End of stream")
loop.quit()
elif mtype == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print('Error:', err, debug)
loop.quit()
elif mtype == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print('Warning:', err, debug)
else:
print("unknown message:", message.type, message)
return True
def on_new_pad(obj, pad):
name = pad.get_name()
caps = pad.get_current_caps()
print(obj, 'on_new_pad:', name, caps)
def on_discovered(*args, **kw):
print('on_discovered:', args, kw)
def main():
if len(sys.argv) < 2:
print("usage %s <input file> [<output file>]" % sys.argv[0])
sys.exit(1)
infile = pathlib.Path(sys.argv[1])
assert infile.exists(), f"{infile} does not exist!"
if len(sys.argv) > 2:
outfile = pathlib.Path(sys.argv[2])
else:
outfile = infile.parent / (infile.stem + '.out' + infile.suffix)
assert not outfile.exists(), f"{outfile} exists - not over writing!"
infile_uri = 'file://'+str(infile.resolve())
# outfile_uri = 'file://'+str(outfile.resolve())
Gst.init(None)
# Discover the encoding profile about the infile
# ----------------------------------------------------------
discoverer = GstPbutils.Discoverer()
discoverer.connect('discovered', on_discovered)
info = discoverer.discover_uri(infile_uri)
# video info
print('# video')
for vinfo in info.get_video_streams():
print(vinfo.get_caps().to_string().replace(', ', '\n\t'))
# audio info
print('# audio')
for ainfo in info.get_audio_streams():
print(ainfo.get_caps().to_string().replace(', ', '\n\t'))
profile = GstPbutils.EncodingProfile.from_discoverer(info)
print('profile', profile)
profile_caps = profile.get_input_caps()
print('caps:', repr(profile_caps), repr(str(profile_caps)))
# ----------------------------------------------------------
"""
pipeline = Gst.Pipeline()
filesrc = Gst.ElementFactory.make("filesrc", "src")
filesrc.set_property("location", str(infile.resolve()))
decodebin = Gst.ElementFactory.make("decodebin", "d")
videoconvert_in = Gst.ElementFactory.make("videoconvert", "vc_in")
filt = Gst.ElementFactory.make("timeoverlay", "filt")
videoconvert_out = Gst.ElementFactory.make("videoconvert", "vc_out")
encodebin = Gst.ElementFactory.make("encodebin2", "e")
encodebin.set_property("profile", profile)
filesink = Gst.ElementFactory.make("filesink", "dst")
filesink.set_property("location", str(outfile.resolve()))
fakesink = Gst.ElementFactory.make("fakesink", "fake")
pipeline.add(filesrc)
pipeline.add(decodebin)
pipeline.add(videoconvert_in)
pipeline.add(filt)
pipeline.add(videoconvert_out)
pipeline.add(encodebin)
pipeline.add(filesink)
pipeline.add(fakesink)
# /-> videoconvert -> filter -> videoconvert -\
# / \
# filesrc --> decodebin -+ +-> encodebin -> filesink
# \ /
# \-------------------------------------------/
print(dir(encodebin))
filesrc.link(decodebin)
def decodebin_pad_added(element, pad):
string = pad.query_caps(None).to_string()
print('Found stream: %s' % string)
if string.startswith('video/x-raw'):
pad.link(videoconvert_in.get_static_pad('sink'))
#elif string.startswith('audio'):
# pad.link(encodebin.request_pad_simple('audio0'))
decodebin.connect("pad-added", decodebin_pad_added)
fakesink.link_pads("sink", videoconvert_in, "src")
#filt.link_pads("sink", videoconvert_in, "src")
videoconvert_out.link_pads("src", filt, "sink")
encodebin.link_pads("video0", videoconvert_out, "src")
#encodebin.link_pads("audio0", decodebin, "src_1")
filesink.link(encodebin)
"""
pipeline_str = f"""\
filesrc location={str(infile.resolve())}
! decodebin name=d
encodebin name=e
! filesink location={str(outfile.resolve())}
"""
"""
d.audio_0 ! e.audio_0
d.video_%u ! videoconvert name=vc_in ! timeoverlay ! videoconvert name=vc_out ! e.video_%u
"""
print("pipeline", "-"*50)
print(pipeline_str)
pipeline = Gst.parse_launch(pipeline_str)
print('-'*50)
#vc_out = pipeline.get_child_by_name('vc_out')
#decodebin = pipeline.get_child_by_name('d')
#vc_out.link_pads('src', encodebin, 'video0')
#decodebin.link_pads('src_1', encodebin, 'audio0')
encodebin = pipeline.get_child_by_name('e')
encodebin.set_property('profile', profile)
encodebin.connect("pad-added", on_new_pad)
#e_v0 = encodebin.request_pad_simple("video_0")
e_v0 = encodebin.emit("request-profile-pad", "video/x-raw")
print('e_v0:', e_v0)
#e_a0 = encodebin.request_pad_simple("audio_0")
e_a0 = encodebin.emit("request-profile-pad", "audio")
print('e_a0:', e_a0)
decodebin = pipeline.get_child_by_name('d')
#decodebin.link_pads("src_0", encodebin, "video_0")
#decodebin.link_pads("src_1", encodebin, "audio_0")
decodebin.connect("pad-added", on_new_pad)
bus = pipeline.get_bus()
# allow bus to emit messages to main thread
bus.add_signal_watch()
# Start pipeline
pipeline.set_state(Gst.State.PLAYING)
# Init GObject loop to handle Gstreamer Bus Events
loop = GObject.MainLoop()
# Add handler to specific signal
# https://lazka.github.io/pgi-docs/GObject-2.0/classes/Object.html#GObject.Object.connect
bus.connect("message", on_message, loop)
try:
loop.run()
except Exception:
traceback.print_exc()
loop.quit()
# Stop Pipeline
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment