Skip to content

Instantly share code, notes, and snippets.

@velovix
Created October 23, 2020 00:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save velovix/796b7d210b55877f0cff47702ed89989 to your computer and use it in GitHub Desktop.
Save velovix/796b7d210b55877f0cff47702ed89989 to your computer and use it in GitHub Desktop.
Reproduces an assertion failure using gst-rtsp-server
from threading import Thread
from time import sleep
from typing import Optional
import gi
gi.require_version("Gst", "1.0")
gi.require_version("GstRtsp", "1.0")
gi.require_version("GstRtspServer", "1.0")
from gi.repository import Gst, GstRtsp, GstRtspServer, GLib
Gst.init()
class MediaFactory(GstRtspServer.RTSPMediaFactory):
def __init__(self, **properties):
super().__init__(**properties)
self.set_media_gtype(RTSPMedia)
def do_create_element(self, url: GstRtsp.RTSPUrl) -> Gst.Pipeline:
return Gst.parse_launch(
"videotestsrc "
"! video/x-raw,width=1920,height=1080,framerate=(fraction)30/1 "
"! videoconvert "
"! vp8enc deadline=1 "
# gst-rtsp-server requires that our payloader be named this
"! rtpvp8pay name=pay0 ")
class RTSPMedia(GstRtspServer.RTSPMedia):
def do_handle_message(self, message: Gst.Message):
if message.type == Gst.MessageType.STATE_CHANGED:
# Set up segment seeking once the pipeline starts playing
pipeline = self.get_property("element")
_, new_state, _ = message.parse_state_changed()
if message.src == pipeline and new_state == Gst.State.PLAYING:
seek_result = pipeline.seek(
rate=1.0,
format=Gst.Format.TIME,
flags=Gst.SeekFlags.FLUSH | Gst.SeekFlags.SEGMENT,
start_type=Gst.SeekType.SET,
start=0,
stop_type=Gst.SeekType.NONE,
stop=-1)
if not seek_result:
print(f"Error: Failed to send seek event")
return GstRtspServer.RTSPMedia.do_handle_message(self, message)
def main():
main_loop = GLib.MainLoop()
main_loop_thread = Thread(target=main_loop.run)
main_loop_thread.start()
server = GstRtspServer.RTSPServer()
server.props.service = "8000"
server.get_thread_pool().set_max_threads(-1)
server.attach(None)
factory = MediaFactory()
factory.set_shared(True)
server.get_mount_points().add_factory("/stream", factory)
print("Stream is now available at /stream")
pipeline1: Optional[Gst.Pipeline] = None
pipeline2: Optional[Gst.Pipeline] = None
def start_streaming_1():
nonlocal pipeline1
pipeline1 = build_pipeline()
print("Started pipeline 1")
def start_streaming_2():
nonlocal pipeline2
pipeline2 = build_pipeline()
print("Started pipeline 2")
Thread(target=start_streaming_1).start()
Thread(target=start_streaming_2).start()
print("Server started on port 8000")
try:
while True:
sleep(0.1)
except KeyboardInterrupt:
print("Exiting...")
if pipeline1 is not None:
pipeline1.set_state(Gst.State.NULL)
if pipeline2 is not None:
pipeline2.set_state(Gst.State.NULL)
main_loop.quit()
main_loop_thread.join()
def build_pipeline() -> Gst.Pipeline:
pipeline = Gst.Pipeline()
appsink = Gst.ElementFactory.make("appsink")
pipeline.add(appsink)
appsink.set_property("max-buffers", 60)
appsink.set_property("drop", True)
appsink.set_property("emit-signals", True)
appsink.set_state(Gst.State.PLAYING)
rtspsrc = Gst.ElementFactory.make("rtspsrc")
pipeline.add(rtspsrc)
rtspsrc.set_property("location", "rtsp://localhost:8000/stream")
rtspsrc.connect("pad-added", set_up_depayloader, pipeline, appsink)
pipeline.set_state(Gst.State.PLAYING)
return pipeline
def set_up_depayloader(element, pad, pipeline, appsink):
caps = pad.get_current_caps()
if "audio" in caps.get_structure(0).get_string("media"):
return
depayloader = Gst.ElementFactory.make("rtpvp8depay")
pipeline.add(depayloader)
depayloader.set_state(Gst.State.PLAYING)
linked = element.link(depayloader)
if not linked:
print("Failed to link rtspsrc to rtpvp8depay")
return
queue = Gst.ElementFactory.make("queue")
pipeline.add(queue)
queue.set_state(Gst.State.PLAYING)
linked = depayloader.link(queue)
if not linked:
print("Failed to link depayloader to queue")
return
decodebin = Gst.ElementFactory.make("decodebin")
pipeline.add(decodebin)
pipeline.set_state(Gst.State.PLAYING)
decodebin.connect("pad-added", set_up_appsink, pipeline, appsink)
linked = queue.link(decodebin)
if not linked:
print("Failed to link queue to decodebin")
return
def set_up_appsink(element, pad, pipeline, appsink):
videoconvert = Gst.ElementFactory.make("videoconvert")
pipeline.add(videoconvert)
videoconvert.set_state(Gst.State.PLAYING)
linked = element.link(videoconvert)
if not linked:
print("Failed to link element to videoconvert")
return
linked = videoconvert.link(appsink)
if not linked:
print("Failed to link videoconvert to appsink")
return
print("Pipeline built!")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment