Skip to content

Instantly share code, notes, and snippets.

@patrickelectric
Created April 20, 2021 15:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save patrickelectric/810a691f3ed107e4228d40568dfb219c to your computer and use it in GitHub Desktop.
Save patrickelectric/810a691f3ed107e4228d40568dfb219c to your computer and use it in GitHub Desktop.
rtsp factory python stream
import sys
import gi
sys.path.insert(0, "/usr/local/lib/python3.8/dist-packages")
print(sys.path)
#gi.require_version('Gst', '1.0')
from gi.repository import Gst, GstRtspServer, GObject
loop = GObject.MainLoop()
GObject.threads_init()
Gst.init(None)
class MyFactory(GstRtspServer.RTSPMediaFactory):
def __init__(self):
GstRtspServer.RTSPMediaFactory.__init__(self)
def do_create_element(self, url):
s_src = "v4l2src ! video/x-raw,rate=30,width=320,height=240 ! videoconvert ! video/x-raw,format=I420"
s_h264 = "videoconvert ! vaapiencode_h264 bitrate=1000"
s_src = "videotestsrc ! video/x-raw,rate=30,width=320,height=240,format=I420"
s_h264 = "x264enc tune=zerolatency"
pipeline_str = "( {s_src} ! queue max-size-buffers=1 name=q_enc ! {s_h264} ! rtph264pay name=pay0 pt=96 )".format(**locals())
if len(sys.argv) > 1:
pipeline_str = " ".join(sys.argv[1:])
print(pipeline_str)
return Gst.parse_launch(pipeline_str)
class GstServer():
def __init__(self):
self.server = GstRtspServer.RTSPServer()
f = MyFactory()
f.set_shared(True)
m = self.server.get_mount_points()
m.add_factory("/test", f)
self.server.attach(None)
if __name__ == '__main__':
s = GstServer()
loop.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment