Skip to content

Instantly share code, notes, and snippets.

@tylercubell
Created February 11, 2019 01:07
Show Gist options
  • Save tylercubell/14cf51a40c517e12c102c8f77831ee80 to your computer and use it in GitHub Desktop.
Save tylercubell/14cf51a40c517e12c102c8f77831ee80 to your computer and use it in GitHub Desktop.
GStreamer Automatically Restart Live Stream On Error
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
Gst.init(None)
class Main:
def __init__(self):
self.pipeline = Gst.Pipeline.new("pipeline")
self.bus = self.pipeline.get_bus()
# Test HLS stream.
self.uri = "http://qthttp.apple.com.edgesuite.net/1010qwoeiuryfg/sl.m3u8"
self.souphttpsrc = Gst.ElementFactory.make("souphttpsrc", "souphttpsrc")
self.souphttpsrc.set_property("is-live", True)
self.souphttpsrc.set_property("location", self.uri)
self.pipeline.add(self.souphttpsrc)
self.decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
self.pipeline.add(self.decodebin)
self.decodebin.connect("pad-added", self.decodebin_src_pad_created)
self.audioconvert = Gst.ElementFactory.make("audioconvert", "audioconvert")
self.audioconvert_sink_pad = self.audioconvert.get_static_pad("sink")
self.pipeline.add(self.audioconvert)
self.audioresample = Gst.ElementFactory.make("audioresample", "audioresample")
self.pipeline.add(self.audioresample)
self.autovideosink = Gst.ElementFactory.make("autovideosink", "autovideosink")
self.autovideosink_sink_pad = self.autovideosink.get_static_pad("sink")
self.pipeline.add(self.autovideosink)
self.autoaudiosink = Gst.ElementFactory.make("autoaudiosink", "autoaudiosink")
self.pipeline.add(self.autoaudiosink)
self.souphttpsrc.link(self.decodebin)
self.audioconvert.link(self.audioresample)
self.audioresample.link(self.autoaudiosink)
def decodebin_src_pad_created(self, element, pad):
pad_type = pad.get_current_caps().get_structure(0).get_name()
if pad_type == "audio/x-raw" and not self.audioconvert_sink_pad.is_linked():
pad.link(self.audioconvert_sink_pad)
if pad_type == "video/x-raw" and not self.autovideosink_sink_pad.is_linked():
pad.link(self.autovideosink_sink_pad)
def custom_message(self, name):
custom_structure = Gst.Structure.new_empty(name)
custom_message = Gst.Message.new_application(None, custom_structure)
self.bus.post(custom_message)
def start_switch(self):
if not self.start_switch_active:
self.start_switch_active = True
self.souphttpsrc.get_static_pad("src").add_probe(Gst.PadProbeType.BLOCK, \
self.souphttpsrc_block_probe_callback)
if self.decodebin.get_static_pad("src_0"):
self.decodebin_block_probe_id = \
self.decodebin.get_static_pad("src_0").add_probe(Gst.PadProbeType.BLOCK, \
self.decodebin_block_probe_callback)
self.custom_message("do_switch")
def souphttpsrc_block_probe_callback(self, pad, info):
return Gst.PadProbeReturn.DROP
def decodebin_block_probe_callback(self, pad, info):
return Gst.PadProbeReturn.DROP
def do_switch(self):
# Get ready to add/remove elements.
self.pipeline.set_state(Gst.State.READY)
# Remove old souphttpsrc.
self.souphttpsrc.set_state(Gst.State.NULL)
self.souphttpsrc.unlink(self.decodebin)
self.pipeline.remove(self.souphttpsrc)
del self.souphttpsrc
# Remove old decodebin.
self.decodebin.set_state(Gst.State.NULL)
self.decodebin.unlink(self.autovideosink)
self.decodebin.unlink(self.audioconvert)
self.pipeline.remove(self.decodebin)
del self.decodebin
# Reset pipeline time so stream can play from the beginning.
self.pipeline.seek_simple(Gst.Format.TIME, Gst.SeekFlags.FLUSH, 0)
# Add new decodebin.
self.decodebin = Gst.ElementFactory.make("decodebin", "decodebin")
self.pipeline.add(self.decodebin)
self.decodebin.connect("pad-added", self.decodebin_src_pad_created)
# Add new souphttpsrc.
self.souphttpsrc = Gst.ElementFactory.make("souphttpsrc", "souphttpsrc")
self.souphttpsrc.set_property("is-live", True)
self.souphttpsrc.set_property("location", self.uri)
self.pipeline.add(self.souphttpsrc)
self.souphttpsrc.link(self.decodebin)
# Play stream.
self.pipeline.set_state(Gst.State.PLAYING)
self.start_switch_active = False
def run(self):
self.start_switch_active = False
self.pipeline.set_state(Gst.State.PLAYING)
while True:
try:
message = self.bus.timed_pop(Gst.SECOND)
if message == None:
pass
elif message.type == Gst.MessageType.APPLICATION:
if message.get_structure().get_name() == "start_switch":
if not self.start_switch_active:
self.start_switch()
elif message.get_structure().get_name() == "do_switch":
self.do_switch()
elif message.type == Gst.MessageType.EOS:
break
elif message.type == Gst.MessageType.ERROR:
# Assumes error comes from souphttpsrc/decodebin for this example.
# Can add filter using message.src.get_name()/message.src.get_parent().
self.custom_message("start_switch")
except KeyboardInterrupt:
break
self.pipeline.set_state(Gst.State.NULL)
start = Main()
start.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment