Created
December 11, 2021 14:29
-
-
Save yury-sannikov/a0b17e506c853a4a4b3ea492fe2d750e to your computer and use it in GitHub Desktop.
Viseron with GStreamer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## For the sake of simplicity I put in into the Dockerfile to set up gstreamer and PyGObject | |
# Gstreamer | |
RUN apt-get update \ | |
&& apt-get install -y --no-install-recommends gstreamer1.0-tools gstreamer1.0-alsa \ | |
gstreamer1.0-plugins-base gstreamer1.0-plugins-good \ | |
gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly \ | |
gstreamer1.0-libav libgstreamer1.0-dev \ | |
libgstreamer-plugins-base1.0-dev \ | |
libgstreamer-plugins-good1.0-dev | |
# install PyGObject | |
RUN apt-get install -y libgirepository1.0-dev gcc libcairo2-dev pkg-config python3-dev gir1.2-gtk-3.0 vim | |
RUN /usr/local/bin/python3 -m pip install pycairo PyGObject | |
VOLUME /config | |
VOLUME /recordings | |
ENTRYPOINT ["/init"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# I tried to put together a sample camera/stream using gstreamer. It's pretty basic but it does HW | |
# accelerated video decoding | |
"""Class to interact with an FFmpeog stream.""" | |
from __future__ import annotations | |
from gi.repository import Gst | |
from time import sleep | |
import logging | |
import subprocess as sp | |
from typing import TYPE_CHECKING, Dict, Optional, Union | |
import cv2 | |
from viseron.camera.frame_decoder import FrameDecoder | |
from viseron.const import ( | |
FFMPEG_LOG_LEVELS, | |
FFPROBE_TIMEOUT, | |
) | |
from viseron.detector import Detector | |
from viseron.exceptions import FFprobeError, FFprobeTimeout, StreamInformationError | |
from viseron.helpers.logs import FFmpegFilter, LogPipe, SensitiveInformationFilter | |
from viseron.watchdog.subprocess_watchdog import RestartablePopen | |
from .frame import Frame | |
import gi | |
gi.require_version('Gst', '1.0') | |
if TYPE_CHECKING: | |
from viseron.config.config_camera import CameraConfig, Substream | |
class GStream: | |
"""Represents a stream of frames from a camera.""" | |
def __init__( | |
self, | |
config, | |
stream_config: Union[CameraConfig, Substream], | |
): | |
self._logger = logging.getLogger(__name__ + "." + config.camera.name_slug) | |
self._logger.addFilter(SensitiveInformationFilter()) | |
self._logger.addFilter(FFmpegFilter(config.camera.ffmpeg_recoverable_errors)) | |
self._config = config | |
self.stream_config = stream_config | |
self._frame_no = 0 | |
self._first_read = False | |
self._pipe = None | |
self._log_pipe = LogPipe( | |
self._logger, FFMPEG_LOG_LEVELS[config.camera.ffmpeg_loglevel] | |
) | |
self._ffprobe_log_pipe = LogPipe( | |
self._logger, FFMPEG_LOG_LEVELS[config.camera.ffprobe_loglevel] | |
) | |
self._ffprobe_timeout = FFPROBE_TIMEOUT | |
stream_codec = None | |
# If any of the parameters are unset we need to fetch them using FFprobe | |
if ( | |
not self.stream_config.width | |
or not self.stream_config.height | |
or not self.stream_config.fps | |
or not self.stream_config.codec | |
): | |
( | |
width, | |
height, | |
fps, | |
stream_codec, | |
) = self.get_stream_information(self.stream_config.stream_url) | |
self.width = self.stream_config.width if self.stream_config.width else width | |
self.height = self.stream_config.height if self.stream_config.height else height | |
self.fps = self.stream_config.fps if self.stream_config.fps else fps | |
self._output_fps = self.fps | |
self.stream_codec = stream_codec | |
if self.width and self.height and self.fps: | |
pass | |
else: | |
raise StreamInformationError(self.width, self.height, self.fps) | |
self.decoders: Dict[str, FrameDecoder] = {} | |
if stream_config.pix_fmt == "nv12": | |
self._color_converter = cv2.COLOR_YUV2RGB_NV21 | |
self._color_plane_width = self.width | |
self._color_plane_height = int(self.height * 1.5) | |
self._frame_bytes = int(self.width * self.height * 1.5) | |
else: | |
assert False, "pix_fmt other than nv12 not supported" | |
# gstreamer sink and pipe | |
self.video_pipe = None | |
self.video_sink = None | |
self.frame_bytes = None | |
self._bus = None | |
# init gstreamer | |
Gst.init([]) | |
self._logger.setLevel(logging.DEBUG) | |
@property | |
def output_fps(self): | |
"""Return stream output FPS.""" | |
return self._output_fps | |
@output_fps.setter | |
def output_fps(self, value: bool): | |
self._output_fps = value | |
def calculate_output_fps(self): | |
"""Calculate FFmpeg output FPS.""" | |
max_interval_fps = 1 / min( | |
[decoder.interval for decoder in self.decoders.values()] | |
) | |
self.output_fps = round(min([max_interval_fps, self.fps])) | |
def get_stream_information(self, stream_url): | |
assert False, "get_stream_information not implemented" | |
# check out | |
# gst-discoverer-1.0 -v rtsp://user:pwd@192.168.5.95:554/stream0 | |
def pipe(self): | |
"""Return subprocess pipe for FFmpeg.""" | |
with Detector.lock: | |
return sp.Popen( | |
self.build_command(), | |
stdout=sp.PIPE, | |
stderr=self._log_pipe, | |
) | |
def start_pipe(self): | |
"""Start piping frames from FFmpeg.""" | |
video_source = 'rtspsrc location={}'.format("rtsp://admin:123456@192.168.5.95:554/stream0") | |
video_codec = '! rtph265depay ! h265parse ! omxh265dec' | |
video_decode = \ | |
'! video/x-raw,format=(string)NV12' | |
video_sink_conf = \ | |
'! appsink emit-signals=true sync=false max-buffers=2 drop=true' | |
config = [ | |
video_source, | |
video_codec, | |
video_decode, | |
video_sink_conf | |
] | |
command = ' '.join(config) | |
self.video_pipe = Gst.parse_launch(command) | |
self.video_pipe.set_state(Gst.State.PLAYING) | |
self.video_sink = self.video_pipe.get_by_name('appsink0') | |
self.video_sink.connect('new-sample', self.callback) | |
self._bus = self.video_pipe.get_bus() | |
self._bus.add_signal_watch() | |
self._bus.connect("message", self.bus_message_handler) | |
def bus_message_handler(self, bus, message: Gst.Message) -> bool: | |
self._logger.debug(f" >>> bus message rx {message.type}") | |
if message.type == Gst.MessageType.ERROR: | |
err, debug = message.parse_error() | |
self._logger.error(f" >>> Gst.MessageType.ERROR: {err}, {debug}") | |
return True | |
def callback(self, sink): | |
sample = sink.emit('pull-sample') | |
buf = sample.get_buffer() | |
buf_size = buf.get_size() | |
self.frame_bytes = buf.extract_dup(0, buf_size) | |
return Gst.FlowReturn.OK | |
def close_pipe(self): | |
"""Close FFmpeg pipe.""" | |
self.video_pipe.set_state(Gst.State.NULL) | |
def poll(self): | |
"""Poll pipe.""" | |
# TODO: detect gstreamer crashed/stopped | |
if self.video_pipe: | |
state = self.video_pipe.get_state() | |
self._logger.debug(f" >>> (poll) gstreamer pipe state: {state}") | |
return None | |
def read(self) -> Optional[Frame]: | |
if not self.video_pipe: | |
return None | |
if not self.frame_bytes: | |
return None | |
if len(self.frame_bytes) != self._frame_bytes: | |
self._logger.debug(f" >>> frame buffer size mismatch actual: {len(self.frame_bytes)}, expected: {self._frame_bytes}") | |
return None | |
frame = Frame( | |
self._color_converter, | |
self._color_plane_width, | |
self._color_plane_height, | |
self.frame_bytes, | |
self.width, | |
self.height, | |
) | |
self.frame_bytes = None | |
return frame |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment