Last active
February 28, 2022 09:35
-
-
Save TheConstant3/f2d7120c11d568afab351f2a1d50aaaf to your computer and use it in GitHub Desktop.
RtspServer with two queues for camera stream and processed camera stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import multiprocessing as mp | |
import numpy as np | |
import cv2 | |
import gi | |
gi.require_version('Gst', '1.0') | |
gi.require_version('GstRtspServer', '1.0') | |
from gi.repository import GObject, Gst, GstRtspServer | |
Gst.init(None) | |
class GstreamerEncoderRtsp(mp.Process): | |
def __init__(self, stop_event, switch_to_camera_event, processor_queue, camera_queue, url, framerate, shape): | |
super().__init__() | |
self.stop_event = stop_event | |
self.switch_to_camera_event = switch_to_camera_event | |
self.url = url | |
self.processor_queue = processor_queue | |
self.camera_queue = camera_queue | |
self.framerate = framerate | |
self.duration = 1 / self.framerate * Gst.SECOND | |
self.shape = shape | |
self.number_frames = 0 | |
self.rtsp_port_num = '8554' | |
self.last_frame = None | |
def frame_to_gst_buffer(self, frame: np.ndarray) -> Gst.Buffer: | |
data = frame.tostring() | |
buf = Gst.Buffer.new_allocate(None, len(data), None) | |
buf.fill(0, data) | |
buf.duration = self.duration | |
timestamp = self.number_frames * self.duration | |
buf.pts = buf.dts = int(timestamp) | |
buf.offset = timestamp | |
self.number_frames += 1 | |
return buf | |
def on_need_data(self, src, lenght): | |
if self.switch_to_camera_event.is_set(): | |
queue = self.camera_queue | |
else: | |
queue = self.processor_queue | |
if self.last_frame is None and queue.empty(): | |
return | |
if not queue.empty(): | |
self.last_frame = queue.get() | |
draw_image = cv2.putText(self.last_frame.copy(), f'{self.number_frames}', (500, 500), | |
cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), thickness=2) | |
data = draw_image.tostring() | |
buf = Gst.Buffer.new_allocate(None, len(data), None) | |
buf.fill(0, data) | |
buf.duration = self.duration | |
timestamp = self.number_frames * self.duration | |
buf.pts = buf.dts = int(timestamp) | |
buf.offset = timestamp | |
self.number_frames += 1 | |
src.emit('push-buffer', buf) | |
def run(self): | |
pipeline = Gst.parse_launch('appsrc name=m_appsrc ' | |
'! capsfilter name=m_capsfilter ' | |
'! videoconvert ' | |
'! capsfilter name=m_capsfilter2 ' | |
'! x264enc ' | |
'! rtph264pay name=m_rtph264pay ' | |
'! udpsink name=m_udpsink ') | |
source = pipeline.get_by_name('m_appsrc') | |
source.connect("need-data", self.on_need_data) | |
caps = Gst.caps_from_string(f'video/x-raw, format=RGB, width={self.shape[1]}, height={self.shape[0]}, ' | |
f'framerate={self.framerate}/1') | |
capsfilter = pipeline.get_by_name('m_capsfilter') | |
capsfilter.set_property('caps', caps) | |
caps2 = Gst.caps_from_string(f'video/x-raw,format=I420') | |
capsfilter2 = pipeline.get_by_name('m_capsfilter2') | |
capsfilter2.set_property('caps', caps2) | |
sink = pipeline.get_by_name('m_udpsink') | |
sink.set_property('host', 'localhost') | |
sink.set_property('port', 5004) | |
print('pipeline created') | |
# Start playing | |
ret = pipeline.set_state(Gst.State.PLAYING) | |
if ret == Gst.StateChangeReturn.FAILURE: | |
print("Unable to set the pipeline to the playing state.") | |
self.stop_event.set() | |
factory_pipeline = f'udpsrc name=pay0 port=5004 ' \ | |
f'buffer-size=524288 caps="application/x-rtp, media=video, clock-rate=90000, ' \ | |
f'encoding-name=(string)H264, payload=96"' | |
factory = GstRtspServer.RTSPMediaFactory.new() | |
factory.set_launch(f"{factory_pipeline}") | |
factory.set_shared(True) | |
server = GstRtspServer.RTSPServer.new() | |
server.props.service = self.rtsp_port_num | |
server.get_mount_points().add_factory(self.url, factory) | |
server.attach(None) | |
# Wait until error or EOS | |
bus = pipeline.get_bus() | |
while True: | |
if self.stop_event.is_set(): | |
print('Stopping CAM Stream by main process') | |
break | |
message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY) | |
if message: | |
print('...', message.type) | |
# print(message.parse()) | |
if message.type == Gst.MessageType.ERROR: | |
err, debug = message.parse_error() | |
print("Error received from element %s: %s" % ( | |
message.src.get_name(), err)) | |
print("Debugging information: %s" % debug) | |
break | |
print('terminating cam pipe') | |
self.stop_event.set() | |
pipeline.set_state(Gst.State.NULL) | |
stop_event = mp.Event() | |
switch_to_camera_event = mp.Event() | |
processor_queue = mp.Queue(maxsize=1) | |
camera_queue = mp.Queue(maxsize=1) | |
framerate = 1 | |
shape = (1080, 1920, 3) | |
frame = cv2.imread('1.jpg') | |
encoder = GstreamerEncoderRtsp(stop_event, switch_to_camera_event, processor_queue, camera_queue, '/ds-test', | |
framerate, shape) | |
encoder.start() | |
i = 0 | |
t1 = time.time() | |
queue = None | |
while True: | |
if time.time() - t1 > 5: | |
text = f'camera_{i}' | |
queue = camera_queue | |
switch_to_camera_event.set() | |
else: | |
text = f'processor_{i}' | |
queue = processor_queue | |
if time.time() - t1 > 10: | |
t1 = time.time() | |
switch_to_camera_event.clear() | |
if queue.empty(): | |
i += 1 | |
draw_image = cv2.putText(frame.copy(), text, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 2, (0, 0, 255), thickness=2) | |
draw_image = cv2.resize(draw_image, (shape[1], shape[0],)) | |
queue.put(draw_image) | |
print(f'send {text}') | |
time.sleep(0.3) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment