Skip to content

Instantly share code, notes, and snippets.

@eyllanesc
Last active June 2, 2024 09:40
Show Gist options
  • Save eyllanesc/6486dc26eebb1f1b71469959d086a649 to your computer and use it in GitHub Desktop.
Save eyllanesc/6486dc26eebb1f1b71469959d086a649 to your computer and use it in GitHub Desktop.
Examples of QtMultimedia in Qt6
import asyncio
from functools import cached_property, partial
from PyQt6.QtCore import (
pyqtSlot as Slot,
)
from utils import FrameProvider
import cv2
def request_frame(video_capture):
if not video_capture.isOpened():
return
ret, frame = video_capture.read()
if ret:
return cv2.cvtColor(frame, cv2.COLOR_RGB2RGBA)
class OpenCVAsyncProvider(FrameProvider):
@cached_property
def video_capture(self):
return cv2.VideoCapture(0)
@Slot()
def start(self):
asyncio.ensure_future(self.start_request())
async def start_request(self):
loop = asyncio.get_running_loop()
while True:
image_frame = await loop.run_in_executor(
None, partial(request_frame, self.video_capture)
)
self.write_frame(image_frame)
import QtQuick
import QtQuick.Controls
import QtMultimedia
ApplicationWindow {
id: root
width: 640
height: 480
visible: true
VideoOutput {
id: videoOutput
anchors.fill: parent
Component.onCompleted: function() {
frame_provider.video_sink = videoOutput.videoSink;
frame_provider.start();
}
}
}
import asyncio
from functools import partial
import sys
from PyQt6.QtCore import Qt
from PyQt6.QtWidgets import QApplication, QGraphicsScene, QGraphicsView
from PyQt6.QtMultimediaWidgets import QGraphicsVideoItem
import qasync
from asyncprovider import OpenCVAsyncProvider
def main():
app = QApplication(sys.argv)
loop = qasync.QEventLoop(app)
asyncio.set_event_loop(loop)
scene = QGraphicsScene()
view = QGraphicsView(scene)
view.resize(640, 480)
view.show()
video_item = QGraphicsVideoItem()
scene.addItem(video_item)
frame_provider = OpenCVAsyncProvider()
frame_provider.video_sink = video_item.videoSink()
frame_provider.start()
video_item.nativeSizeChanged.connect(
partial(view.fitInView, video_item, Qt.AspectRatioMode.KeepAspectRatio)
)
with loop:
try:
loop.run_forever()
except asyncio.exceptions.CancelledError:
print("OK")
if __name__ == "__main__":
main()
import asyncio
from pathlib import Path
import os
import sys
from PyQt6.QtCore import QCoreApplication, Qt, QUrl
from PyQt6.QtGui import QGuiApplication
from PyQt6.QtQml import QQmlApplicationEngine
import qasync
from asyncprovider import OpenCVAsyncProvider
CURRENT_DIRECTORY = Path(__file__).resolve().parent
def main():
app = QGuiApplication(sys.argv)
loop = qasync.QEventLoop(app)
asyncio.set_event_loop(loop)
frame_provider = OpenCVAsyncProvider()
engine = QQmlApplicationEngine()
engine.rootContext().setContextProperty("frame_provider", frame_provider)
filename = os.fspath(CURRENT_DIRECTORY / "main.qml")
url = QUrl.fromLocalFile(filename)
def handle_object_created(obj, obj_url):
if obj is None and url == obj_url:
QCoreApplication.exit(-1)
engine.objectCreated.connect(
handle_object_created, Qt.ConnectionType.QueuedConnection
)
engine.load(url)
with loop:
try:
loop.run_forever()
except asyncio.exceptions.CancelledError:
print("OK")
if __name__ == "__main__":
main()
import asyncio
import sys
from PyQt6.QtWidgets import QApplication
from PyQt6.QtMultimediaWidgets import QVideoWidget
import qasync
from asyncprovider import OpenCVAsyncProvider
def main():
app = QApplication(sys.argv)
loop = qasync.QEventLoop(app)
asyncio.set_event_loop(loop)
w = QVideoWidget()
w.resize(640, 480)
w.show()
frame_provider = OpenCVAsyncProvider()
frame_provider.video_sink = w.videoSink()
frame_provider.start()
with loop:
try:
loop.run_forever()
except asyncio.exceptions.CancelledError:
print("OK")
if __name__ == "__main__":
main()
import ctypes
from dataclasses import dataclass
from functools import cached_property
from PyQt6.QtCore import (
QObject,
QSize,
pyqtSignal as Signal,
pyqtProperty as Property,
)
from PyQt6.QtGui import QImage
from PyQt6.QtMultimedia import QVideoFrame, QVideoFrameFormat
import numpy as np
def write_qvideoframe_from_ndarray(video_frame, np_image, with_ctypes=True):
plane = 0
data = video_frame.bits(plane)
assert np_image.size == video_frame.mappedBytes(plane), "Must be same size!!!"
if with_ctypes:
ctypes.memmove(
(ctypes.c_ubyte * len(data)).from_buffer(data)
if isinstance(data, memoryview)
else int(data),
np_image.ctypes.data,
video_frame.mappedBytes(plane),
)
else:
if not isinstance(data, memoryview):
data.setsize(video_frame.mappedBytes(plane))
data[:] = bytearray(np_image)
def build_video_frame(size):
video_frame_format = QVideoFrameFormat.PixelFormat.Format_BGRA8888
video_frame = QVideoFrame(QVideoFrameFormat(size, video_frame_format))
if not video_frame.isValid() or not video_frame.map(QVideoFrame.MapMode.WriteOnly):
raise RuntimeError(f"QVideoFrame is invalid or not writable")
return video_frame
def convert_qvideoframe_to_qimage(video_frame):
image_format = QVideoFrameFormat.imageFormatFromPixelFormat(
video_frame.pixelFormat()
)
if image_format == QImage.Format.Format_Invalid:
print("Invalid format")
return
plane = 0
ptr = video_frame.bits(plane)
image = QImage(
ptr if isinstance(ptr, memoryview) else int(ptr),
video_frame.width(),
video_frame.height(),
image_format,
)
return image
@dataclass(frozen=True)
class _QVideoFrameInterface:
video_frame: QVideoFrame
plane: int = 0
@cached_property
def __array_interface__(self):
data = self.video_frame.bits(self.plane)
if not isinstance(data, memoryview):
data.setsize(self.video_frame.mappedBytes(self.plane))
return dict(
shape=(self.video_frame.height(), self.video_frame.width(), 4),
typestr="|u1",
data=data,
strides=(self.video_frame.bytesPerLine(self.plane), 4, 1),
version=3,
)
def convert_qvideoframe_to_numpy(video_frame):
return np.asarray(_QVideoFrameInterface(video_frame))
@dataclass(frozen=True)
class _QImageInterface:
image: QImage
@cached_property
def __array_interface__(self):
data = self.image.bits()
if not isinstance(data, memoryview):
data.setsize(self.image.sizeInBytes())
return dict(
shape=(self.image.height(), self.image.width(), 4),
typestr="|u1",
data=data,
strides=(self.image.bytesPerLine(), 4, 1),
version=3,
)
def convert_qimage_to_numpy(video_frame):
return np.asarray(_QImageInterface(video_frame))
class FrameProvider(QObject):
video_sink_changed = Signal()
def __init__(self, parent=None):
super().__init__(parent)
self._video_sink = None
@Property(QObject, notify=video_sink_changed)
def video_sink(self):
return self._video_sink
@video_sink.setter
def video_sink(self, video_sink):
if self.video_sink is not None:
self.video_sink.destroyed.disconnect(self._handle_destroyed)
if self.video_sink.parent() is self:
self.video_sink.setParent(None)
self.video_sink.deleteLater()
self._video_sink = video_sink
video_sink.destroyed.connect(self._handle_destroyed)
self.video_sink_changed.emit()
def write_frame(self, image_frame):
if (
self.video_sink is None
or image_frame is None
or len(image_frame.shape) != 3
):
print("video_sink or image_format is None")
return
height, width, _ = image_frame.shape
try:
video_frame = build_video_frame(QSize(width, height))
except RuntimeError:
pass
else:
write_qvideoframe_from_ndarray(video_frame, image_frame, with_ctypes=True)
video_frame.unmap()
self.video_sink.setVideoFrame(video_frame)
def _handle_destroyed(self):
self._video_sink = None
@donshwah
Copy link

Has your example been run recently using Python 3.10 and PySide6 6.2.2? I've opened a test project using your code and it works but it is very laggy and slow.

@eyllanesc
Copy link
Author

eyllanesc commented Jan 14, 2022

@donbfry If you comment the logic of the barcode is it still slow? If it isn't then you should optimize that part, for example processing every N frames instead of every frame. Also if your image is very large you could scale it.

@donshwah
Copy link

donshwah commented Jan 14, 2022

The barcode logic is the culprit.

I have an implementation of using pyzbar in my PySide6 project along side OpenCV that works. The issue is OpenCV cannot fetch the "friendly name" of the available input devices like Qt can, only the ID which I believe is due to OpenCV using FFMPEG (which is technically fine, works great, just doesn't look as pretty). I was just trying out your approach to see if I could implement it successfully and came across that issue. I'm actually not sure why the barcode logic affects performance so heavily. My barcode decoding logic is essentially the same although I use a QThread, QGraphicsPixMapItem on a QGraphicsView, and a custom Signal.

import sys
import os
import cv2
import warnings

from PySide6 import QtCore, QtMultimedia
from PySide6.QtCore import QEvent, QPoint, QThread, Qt, Signal, Slot
from PySide6.QtGui import QIcon, QImage, QPixmap
from PySide6.QtWidgets import (
    QApplication,
    QGraphicsPixmapItem,
    QGraphicsScene,
    QMainWindow,
    QStackedLayout,
)

import numpy as np
from pyzbar import pyzbar
from pyzbar.wrapper import ZBarSymbol

from modules import *

os.environ["QT_FONT_DPI"] = "96"
warnings.filterwarnings("ignore")


class MainWindow(QMainWindow):
    def __init__(self):
        QMainWindow.__init__(self)

        self.ui = Ui_MainWindow()
        self.ui.setupUi(self)
        UIFunctions.uiDefinitions(self)

        self.scene = QGraphicsScene(self)
        self.pix = QGraphicsPixmapItem()

        self.ui.appFrameLayout.setStackingMode(QStackedLayout.StackAll)
        self.ui.graphicsView.setScene(self.scene)
        self.scene.addItem(self.pix)
        self.cap_thread = CaptureThread()
        self.cap_thread.change_pixmap_signal.connect(self.update_image)
        self.cap_thread.start()

        self.setWindowFlag(Qt.WindowStaysOnTopHint)
        self.setWindowFlag(Qt.FramelessWindowHint)
        self.setWindowFlag(Qt.MSWindowsFixedSizeDialogHint)

        self.show()

        self.ui.graphicsView.viewport().installEventFilter(self)

    def eventFilter(self, watched, event):
        if watched == self.ui.graphicsView.viewport() and event.type() == QEvent.Wheel:
            return True
        else:
            return False

    def closeEvent(self, event):
        self.cap_thread.stop()
        event.accept()

    @Slot(np.ndarray)
    def update_image(self, cv_img):
        qt_img = self.convert_cv_qt(cv_img)
        self.pix.setPixmap(qt_img)

    def convert_cv_qt(self, cv_img):
        h, w, ch = cv_img.shape
        bytes_per_line = ch * w
        convert_to_Qt_format = QImage(
            cv_img.data, w, h, bytes_per_line, QImage.Format_RGB888
        )
        self.ui.graphicsView.fitInView(self.pix, Qt.KeepAspectRatioByExpanding)
        return QPixmap.fromImage(convert_to_Qt_format)


class CaptureThread(QThread):
    change_pixmap_signal = Signal(np.ndarray)
    formats = [
        ZBarSymbol.CODE128,
        ZBarSymbol.QRCODE,
    ]

    def __init__(self):
        super().__init__()
        self._run_flag = True

    def run(self):
        cap = cv2.VideoCapture(1, cv2.CAP_DSHOW)
        while self._run_flag:
            ret, cv_img = cap.read()
            if ret:
                frame = self.read_barcodes(cv_img)
                self.change_pixmap_signal.emit(frame)
        cap.release()

    def stop(self):
        self._run_flag = False
        self.wait()

    def read_barcodes(self, frame):

        grayscale = cv2.cvtColor(frame.copy(), cv2.COLOR_BGR2GRAY)
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        barcodes = pyzbar.decode(
            grayscale, symbols=[ZBarSymbol.CODE128, ZBarSymbol.QRCODE]
        )

        if len(barcodes) > 0:
            for barcode in barcodes:
                x, y, w, h = barcode.rect
                frame = cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 0, 0), 2)
                barcode_info = barcode.data.decode("utf-8")

        return frame


if __name__ == "__main__":

    app = QApplication(sys.argv)

    icon = QIcon("icon.ico")
    system_tray = SystemTray(app)

    window = MainWindow()
    window.setWindowIcon(icon)

    with open("style.css", "r") as f:
        _style = f.read()
        app.setStyleSheet(_style)

    sys.exit(app.exec())

Each frame I am processing the image but performance is virtually unaffected. Could it be from FrameProvider's write_frame() method? If I could get the performance to match what I have now, your method would be my preferred. The closest I came to being able to fetch friendly input device names is when I came across this Stack Overflow post using Windows Media Foundation, CV-camera-finder, which unfortunately does not work for Python 3.10 (but worked for 3.7) and I lack the knowledge of C++ and binding Python functions to C++ to get it to work.

@donshwah
Copy link

Upon further tinkering, I believe there is a memory leak somewhere. It might have to do with the _QImageInterface dataclass.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment