Skip to content

Instantly share code, notes, and snippets.

@yury-sannikov
Created December 11, 2021 14:29
Show Gist options
  • Save yury-sannikov/a0b17e506c853a4a4b3ea492fe2d750e to your computer and use it in GitHub Desktop.
Save yury-sannikov/a0b17e506c853a4a4b3ea492fe2d750e to your computer and use it in GitHub Desktop.
Viseron with GStreamer
## For the sake of simplicity I put in into the Dockerfile to set up gstreamer and PyGObject
# Gstreamer
RUN apt-get update \
&& apt-get install -y --no-install-recommends gstreamer1.0-tools gstreamer1.0-alsa \
gstreamer1.0-plugins-base gstreamer1.0-plugins-good \
gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly \
gstreamer1.0-libav libgstreamer1.0-dev \
libgstreamer-plugins-base1.0-dev \
libgstreamer-plugins-good1.0-dev
# install PyGObject
RUN apt-get install -y libgirepository1.0-dev gcc libcairo2-dev pkg-config python3-dev gir1.2-gtk-3.0 vim
RUN /usr/local/bin/python3 -m pip install pycairo PyGObject
VOLUME /config
VOLUME /recordings
ENTRYPOINT ["/init"]
# I tried to put together a sample camera/stream using gstreamer. It's pretty basic but it does HW
# accelerated video decoding
"""Class to interact with an FFmpeog stream."""
from __future__ import annotations
from gi.repository import Gst
from time import sleep
import logging
import subprocess as sp
from typing import TYPE_CHECKING, Dict, Optional, Union
import cv2
from viseron.camera.frame_decoder import FrameDecoder
from viseron.const import (
FFMPEG_LOG_LEVELS,
FFPROBE_TIMEOUT,
)
from viseron.detector import Detector
from viseron.exceptions import FFprobeError, FFprobeTimeout, StreamInformationError
from viseron.helpers.logs import FFmpegFilter, LogPipe, SensitiveInformationFilter
from viseron.watchdog.subprocess_watchdog import RestartablePopen
from .frame import Frame
import gi
gi.require_version('Gst', '1.0')
if TYPE_CHECKING:
from viseron.config.config_camera import CameraConfig, Substream
class GStream:
"""Represents a stream of frames from a camera."""
def __init__(
self,
config,
stream_config: Union[CameraConfig, Substream],
):
self._logger = logging.getLogger(__name__ + "." + config.camera.name_slug)
self._logger.addFilter(SensitiveInformationFilter())
self._logger.addFilter(FFmpegFilter(config.camera.ffmpeg_recoverable_errors))
self._config = config
self.stream_config = stream_config
self._frame_no = 0
self._first_read = False
self._pipe = None
self._log_pipe = LogPipe(
self._logger, FFMPEG_LOG_LEVELS[config.camera.ffmpeg_loglevel]
)
self._ffprobe_log_pipe = LogPipe(
self._logger, FFMPEG_LOG_LEVELS[config.camera.ffprobe_loglevel]
)
self._ffprobe_timeout = FFPROBE_TIMEOUT
stream_codec = None
# If any of the parameters are unset we need to fetch them using FFprobe
if (
not self.stream_config.width
or not self.stream_config.height
or not self.stream_config.fps
or not self.stream_config.codec
):
(
width,
height,
fps,
stream_codec,
) = self.get_stream_information(self.stream_config.stream_url)
self.width = self.stream_config.width if self.stream_config.width else width
self.height = self.stream_config.height if self.stream_config.height else height
self.fps = self.stream_config.fps if self.stream_config.fps else fps
self._output_fps = self.fps
self.stream_codec = stream_codec
if self.width and self.height and self.fps:
pass
else:
raise StreamInformationError(self.width, self.height, self.fps)
self.decoders: Dict[str, FrameDecoder] = {}
if stream_config.pix_fmt == "nv12":
self._color_converter = cv2.COLOR_YUV2RGB_NV21
self._color_plane_width = self.width
self._color_plane_height = int(self.height * 1.5)
self._frame_bytes = int(self.width * self.height * 1.5)
else:
assert False, "pix_fmt other than nv12 not supported"
# gstreamer sink and pipe
self.video_pipe = None
self.video_sink = None
self.frame_bytes = None
self._bus = None
# init gstreamer
Gst.init([])
self._logger.setLevel(logging.DEBUG)
@property
def output_fps(self):
"""Return stream output FPS."""
return self._output_fps
@output_fps.setter
def output_fps(self, value: bool):
self._output_fps = value
def calculate_output_fps(self):
"""Calculate FFmpeg output FPS."""
max_interval_fps = 1 / min(
[decoder.interval for decoder in self.decoders.values()]
)
self.output_fps = round(min([max_interval_fps, self.fps]))
def get_stream_information(self, stream_url):
assert False, "get_stream_information not implemented"
# check out
# gst-discoverer-1.0 -v rtsp://user:pwd@192.168.5.95:554/stream0
def pipe(self):
"""Return subprocess pipe for FFmpeg."""
with Detector.lock:
return sp.Popen(
self.build_command(),
stdout=sp.PIPE,
stderr=self._log_pipe,
)
def start_pipe(self):
"""Start piping frames from FFmpeg."""
video_source = 'rtspsrc location={}'.format("rtsp://admin:123456@192.168.5.95:554/stream0")
video_codec = '! rtph265depay ! h265parse ! omxh265dec'
video_decode = \
'! video/x-raw,format=(string)NV12'
video_sink_conf = \
'! appsink emit-signals=true sync=false max-buffers=2 drop=true'
config = [
video_source,
video_codec,
video_decode,
video_sink_conf
]
command = ' '.join(config)
self.video_pipe = Gst.parse_launch(command)
self.video_pipe.set_state(Gst.State.PLAYING)
self.video_sink = self.video_pipe.get_by_name('appsink0')
self.video_sink.connect('new-sample', self.callback)
self._bus = self.video_pipe.get_bus()
self._bus.add_signal_watch()
self._bus.connect("message", self.bus_message_handler)
def bus_message_handler(self, bus, message: Gst.Message) -> bool:
self._logger.debug(f" >>> bus message rx {message.type}")
if message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
self._logger.error(f" >>> Gst.MessageType.ERROR: {err}, {debug}")
return True
def callback(self, sink):
sample = sink.emit('pull-sample')
buf = sample.get_buffer()
buf_size = buf.get_size()
self.frame_bytes = buf.extract_dup(0, buf_size)
return Gst.FlowReturn.OK
def close_pipe(self):
"""Close FFmpeg pipe."""
self.video_pipe.set_state(Gst.State.NULL)
def poll(self):
"""Poll pipe."""
# TODO: detect gstreamer crashed/stopped
if self.video_pipe:
state = self.video_pipe.get_state()
self._logger.debug(f" >>> (poll) gstreamer pipe state: {state}")
return None
def read(self) -> Optional[Frame]:
if not self.video_pipe:
return None
if not self.frame_bytes:
return None
if len(self.frame_bytes) != self._frame_bytes:
self._logger.debug(f" >>> frame buffer size mismatch actual: {len(self.frame_bytes)}, expected: {self._frame_bytes}")
return None
frame = Frame(
self._color_converter,
self._color_plane_width,
self._color_plane_height,
self.frame_bytes,
self.width,
self.height,
)
self.frame_bytes = None
return frame
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment