Skip to content

Instantly share code, notes, and snippets.

@TheConstant3
Last active February 28, 2022 09:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TheConstant3/f2d7120c11d568afab351f2a1d50aaaf to your computer and use it in GitHub Desktop.
Save TheConstant3/f2d7120c11d568afab351f2a1d50aaaf to your computer and use it in GitHub Desktop.
RtspServer with two queues for camera stream and processed camera stream
import time
import multiprocessing as mp
import numpy as np
import cv2
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import GObject, Gst, GstRtspServer
Gst.init(None)
class GstreamerEncoderRtsp(mp.Process):
def __init__(self, stop_event, switch_to_camera_event, processor_queue, camera_queue, url, framerate, shape):
super().__init__()
self.stop_event = stop_event
self.switch_to_camera_event = switch_to_camera_event
self.url = url
self.processor_queue = processor_queue
self.camera_queue = camera_queue
self.framerate = framerate
self.duration = 1 / self.framerate * Gst.SECOND
self.shape = shape
self.number_frames = 0
self.rtsp_port_num = '8554'
self.last_frame = None
def frame_to_gst_buffer(self, frame: np.ndarray) -> Gst.Buffer:
data = frame.tostring()
buf = Gst.Buffer.new_allocate(None, len(data), None)
buf.fill(0, data)
buf.duration = self.duration
timestamp = self.number_frames * self.duration
buf.pts = buf.dts = int(timestamp)
buf.offset = timestamp
self.number_frames += 1
return buf
def on_need_data(self, src, lenght):
if self.switch_to_camera_event.is_set():
queue = self.camera_queue
else:
queue = self.processor_queue
if self.last_frame is None and queue.empty():
return
if not queue.empty():
self.last_frame = queue.get()
draw_image = cv2.putText(self.last_frame.copy(), f'{self.number_frames}', (500, 500),
cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), thickness=2)
data = draw_image.tostring()
buf = Gst.Buffer.new_allocate(None, len(data), None)
buf.fill(0, data)
buf.duration = self.duration
timestamp = self.number_frames * self.duration
buf.pts = buf.dts = int(timestamp)
buf.offset = timestamp
self.number_frames += 1
src.emit('push-buffer', buf)
def run(self):
pipeline = Gst.parse_launch('appsrc name=m_appsrc '
'! capsfilter name=m_capsfilter '
'! videoconvert '
'! capsfilter name=m_capsfilter2 '
'! x264enc '
'! rtph264pay name=m_rtph264pay '
'! udpsink name=m_udpsink ')
source = pipeline.get_by_name('m_appsrc')
source.connect("need-data", self.on_need_data)
caps = Gst.caps_from_string(f'video/x-raw, format=RGB, width={self.shape[1]}, height={self.shape[0]}, '
f'framerate={self.framerate}/1')
capsfilter = pipeline.get_by_name('m_capsfilter')
capsfilter.set_property('caps', caps)
caps2 = Gst.caps_from_string(f'video/x-raw,format=I420')
capsfilter2 = pipeline.get_by_name('m_capsfilter2')
capsfilter2.set_property('caps', caps2)
sink = pipeline.get_by_name('m_udpsink')
sink.set_property('host', 'localhost')
sink.set_property('port', 5004)
print('pipeline created')
# Start playing
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Unable to set the pipeline to the playing state.")
self.stop_event.set()
factory_pipeline = f'udpsrc name=pay0 port=5004 ' \
f'buffer-size=524288 caps="application/x-rtp, media=video, clock-rate=90000, ' \
f'encoding-name=(string)H264, payload=96"'
factory = GstRtspServer.RTSPMediaFactory.new()
factory.set_launch(f"{factory_pipeline}")
factory.set_shared(True)
server = GstRtspServer.RTSPServer.new()
server.props.service = self.rtsp_port_num
server.get_mount_points().add_factory(self.url, factory)
server.attach(None)
# Wait until error or EOS
bus = pipeline.get_bus()
while True:
if self.stop_event.is_set():
print('Stopping CAM Stream by main process')
break
message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY)
if message:
print('...', message.type)
# print(message.parse())
if message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print("Error received from element %s: %s" % (
message.src.get_name(), err))
print("Debugging information: %s" % debug)
break
print('terminating cam pipe')
self.stop_event.set()
pipeline.set_state(Gst.State.NULL)
stop_event = mp.Event()
switch_to_camera_event = mp.Event()
processor_queue = mp.Queue(maxsize=1)
camera_queue = mp.Queue(maxsize=1)
framerate = 1
shape = (1080, 1920, 3)
frame = cv2.imread('1.jpg')
encoder = GstreamerEncoderRtsp(stop_event, switch_to_camera_event, processor_queue, camera_queue, '/ds-test',
framerate, shape)
encoder.start()
i = 0
t1 = time.time()
queue = None
while True:
if time.time() - t1 > 5:
text = f'camera_{i}'
queue = camera_queue
switch_to_camera_event.set()
else:
text = f'processor_{i}'
queue = processor_queue
if time.time() - t1 > 10:
t1 = time.time()
switch_to_camera_event.clear()
if queue.empty():
i += 1
draw_image = cv2.putText(frame.copy(), text, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), thickness=2)
draw_image = cv2.resize(draw_image, (shape[1], shape[0],))
queue.put(draw_image)
print(f'send {text}')
time.sleep(0.3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment