Skip to content

Instantly share code, notes, and snippets.

@robobe
Created March 29, 2020 10:02
Show Gist options
  • Save robobe/28ecd4c2fbd10cc17d3e17a02dba9ca1 to your computer and use it in GitHub Desktop.
Save robobe/28ecd4c2fbd10cc17d3e17a02dba9ca1 to your computer and use it in GitHub Desktop.
gstreamer clock #gst
import gi
gi.require_version('Gst', '1.0')
import traceback
import sys
from gi.repository import Gst, GObject
import multiprocessing
class base():
def __init__(self, name):
self.pipeline = None
Gst.init(None)
self.init_pipe()
self.name = name
def on_new_buffer(self, sink):
try:
t = (sink.clock.get_time()-sink.base_time)/1e9
print(f"time {self.name}: {t}")
except:
print('ERROR', traceback.format_exc())
return Gst.FlowReturn.OK
def init_pipe(self):
pipe = self.get_launch_pipe()
print (f"gst-launch-1.0 {pipe}")
self.pipeline = Gst.parse_launch(pipe)
sink = self.pipeline.get_by_name("sink")
sink.set_property("emit-signals", True)
sink.connect("new-sample", self.on_new_buffer)
def get_launch_pipe(self):
raise NotImplementedError()
def run(self):
raise NotImplementedError()
def start(self):
self.run()
loop = GObject.MainLoop()
try:
loop.run()
except:
traceback.print_exc()
class writer(base):
"""
Read image source and stream
"""
def __init__(self):
self.fps = 20
super().__init__("sender")
def get_launch_pipe(self):
pipe = ("videotestsrc name=src ! "
f"video/x-raw,width=640,height=480,framerate={self.fps}/1 ! "
"tee name=tt ! queue ! "
"videoconvert ! appsink name=sink "
"tt. ! queue ! "
"jpegenc ! "
"rtpjpegpay ! "
"udpsink host=127.0.0.1 port=1234")
return pipe
def run(self):
source = self.pipeline.get_by_name('src')
source.set_property("num-buffers", 5)
self.pipeline.set_state(Gst.State.READY)
self.pipeline.set_state(Gst.State.PLAYING)
class reader(base):
def __init__(self):
super().__init__("recv")
def get_launch_pipe(self):
pipe = ("udpsrc port=1234 "
"! application/x-rtp, encoding-name=JPEG,payload=26 "
"! rtpjpegdepay "
"! jpegdec "
"! appsink name=sink")
return pipe
def run(self):
self.pipeline.set_state(Gst.State.READY)
self.pipeline.set_state(Gst.State.PLAYING)
if __name__ == "__main__":
w = writer()
r = reader()
for h in [r, w]:
p = multiprocessing.Process(target=h.start)
p.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment