Skip to content

Instantly share code, notes, and snippets.

@yossioo
Created April 18, 2020 05:52
Show Gist options
  • Save yossioo/b601cd63f9d4bcb9430c1fca561d3a26 to your computer and use it in GitHub Desktop.
Save yossioo/b601cd63f9d4bcb9430c1fca561d3a26 to your computer and use it in GitHub Desktop.
Simple example to demonstrate dynamically adding and removing source elements to a playing pipeline.
#!/usr/bin/env python3
''' https://github.com/GStreamer/gst-python/blob/master/examples/dynamic_src.py
Simple example to demonstrate dynamically adding and removing source elements
to a playing pipeline.
'''
import sys
import random
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GLib', '2.0')
gi.require_version('GObject', '2.0')
from gi.repository import GLib, GObject, Gst
class ProbeData:
def __init__(self, pipe, src):
self.pipe = pipe
self.src = src
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream\n")
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write("Error: %s: %s\n" % (err, debug))
loop.quit()
return True
def dispose_src_cb(src):
src.set_state(Gst.State.NULL)
def probe_cb(pad, info, pdata):
peer = pad.get_peer()
pad.unlink(peer)
pdata.pipe.remove(pdata.src)
# Can't set the state of the src to NULL from its streaming thread
GLib.idle_add(dispose_src_cb, pdata.src)
pdata.src = Gst.ElementFactory.make('videotestsrc')
pdata.src.props.pattern = random.randint(0, 24)
pdata.pipe.add(pdata.src)
srcpad = pdata.src.get_static_pad ("src")
srcpad.link(peer)
pdata.src.sync_state_with_parent()
GLib.timeout_add_seconds(1, timeout_cb, pdata)
return Gst.PadProbeReturn.REMOVE
def timeout_cb(pdata):
srcpad = pdata.src.get_static_pad('src')
srcpad.add_probe(Gst.PadProbeType.IDLE, probe_cb, pdata)
return GLib.SOURCE_REMOVE
def main(args):
GObject.threads_init()
Gst.init(None)
pipe = Gst.Pipeline.new('dynamic')
src = Gst.ElementFactory.make('videotestsrc')
sink = Gst.ElementFactory.make('autovideosink')
pipe.add(src)
pipe.add(sink)
src.link(sink)
pdata = ProbeData(pipe, src)
loop = GObject.MainLoop()
GLib.timeout_add_seconds(1, timeout_cb, pdata)
bus = pipe.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)
# start play back and listen to events
pipe.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# cleanup
pipe.set_state(Gst.State.NULL)
if __name__ == '__main__':
sys.exit(main(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment