Skip to content

Instantly share code, notes, and snippets.

@NeuronQ
Created August 19, 2019 12:43
Show Gist options
  • Save NeuronQ/de24db937a4d5a28eeb46612914b8779 to your computer and use it in GitHub Desktop.
Save NeuronQ/de24db937a4d5a28eeb46612914b8779 to your computer and use it in GitHub Desktop.
import sys
import traceback
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject # noqa
INPUT_VIDEO_FILE_PATH = '/opt/app/data/video_file.mp4'
PLACEHOLDER_FILE_PATH = '/opt/app/data/tvlogo.png'
SOCKET_PATH = '/tmp/shm1'
Gst.init(sys.argv)
# init and create pipeline
pipeline = Gst.Pipeline.new("player")
# video file pipeline elements
source = Gst.ElementFactory.make("filesrc", "source")
assert source
source.set_property("location", INPUT_VIDEO_FILE_PATH)
demux = Gst.ElementFactory.make("qtdemux", "demux")
assert demux
queue = Gst.ElementFactory.make("queue", "queue")
assert queue
h264parse = Gst.ElementFactory.make("h264parse", "h264parse")
assert h264parse
# placeholder pipeline elements
source_placeholder = Gst.ElementFactory.make("filesrc", "source_placeholder")
assert source_placeholder
source_placeholder.set_property("location", PLACEHOLDER_FILE_PATH)
pngdec = Gst.ElementFactory.make("pngdec", "pngdec")
assert pngdec
imagefreeze = Gst.ElementFactory.make("imagefreeze", "imagefreeze")
assert imagefreeze
# common pipeline elements
input_selector = Gst.ElementFactory.make("input-selector", "input_selector")
assert input_selector
payloader = Gst.ElementFactory.make("gdppay", "payloader")
assert payloader
sink = Gst.ElementFactory.make("shmsink", "sink")
assert sink
sink.set_property("socket-path", SOCKET_PATH)
# add video file pipeline elemtns
pipeline.add(source)
pipeline.add(demux)
pipeline.add(queue)
pipeline.add(h264parse)
# add placeholder pipeline elements
pipeline.add(source_placeholder)
pipeline.add(pngdec)
pipeline.add(imagefreeze)
# add commpon pipeline elements
pipeline.add(input_selector)
pipeline.add(payloader)
pipeline.add(sink)
# dynamically link demux when available
def handle_demux_pad_added(src, new_pad, *args, **kwargs):
if new_pad.get_name().startswith('video'):
print("\n--- demux video pad added:", new_pad, new_pad.get_name())
queue_sink_pad = queue.get_static_pad('sink')
new_pad.link(queue_sink_pad)
else:
print("\n--- demux * pad added:", new_pad, new_pad.get_name())
demux.connect("pad-added", handle_demux_pad_added)
# link video file pipeline segment
source.link(demux)
# dmeux to queue gets linked when demux source pad is available (see above)
queue.link(h264parse)
h264parse.link(input_selector)
# link placeholder pipeline segment
source_placeholder.link(pngdec)
pngdec.link(imagefreeze)
imagefreeze.link(input_selector)
# link common pipeline elements
input_selector.link(payloader)
payloader.link(sink)
# set input selector to video file
input_selector_video_file_sink_pad = input_selector.get_static_pad('sink_1')
input_selector.set_property("active-pad", input_selector_video_file_sink_pad)
# example using command:
# command = """filesrc location=/opt/app/data/video-file.mp4 \
# ! qtdemux ! queue ! h264parse \
# ! gdppay ! shmsink socket-path=/tmp/shm1
# """
# pipeline = Gst.parse_launch(command)
# listen to pipeline messages
# https://lazka.github.io/pgi-docs/Gst-1.0/classes/Bus.html
bus = pipeline.get_bus()
# allow bus to emit messages to main thread
bus.add_signal_watch()
def on_message(bus, message, loop):
mtype = message.type
"""
Gstreamer Message Types and how to parse
https://lazka.github.io/pgi-docs/Gst-1.0/flags.html#Gst.MessageType
"""
if mtype == Gst.MessageType.EOS:
# Handle End of Stream
print("End of stream")
# TODO: switch to placeholder when file end reached
elif mtype == Gst.MessageType.ERROR:
# Handle Errors
err, debug = message.parse_error()
print(err, debug)
elif mtype == Gst.MessageType.WARNING:
# Handle warnings
err, debug = message.parse_warning()
print(err, debug)
return True
# Add handler to specific signal
# https://lazka.github.io/pgi-docs/GObject-2.0/classes/Object.html#GObject.Object.connect
bus.connect("message", on_message, None)
print("--- starting pipeline...\n")
pipeline.set_state(Gst.State.PLAYING)
# Init GObject loop to handle Gstreamer Bus Events
loop = GObject.MainLoop()
try:
loop.run()
except:
traceback.print_exc()
print("--- ...bbie :(\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment