Skip to content

Instantly share code, notes, and snippets.

@eyllanesc
Last active June 2, 2024 09:40
Show Gist options
  • Save eyllanesc/6486dc26eebb1f1b71469959d086a649 to your computer and use it in GitHub Desktop.
Save eyllanesc/6486dc26eebb1f1b71469959d086a649 to your computer and use it in GitHub Desktop.
Examples of QtMultimedia in Qt6
import asyncio
from functools import cached_property, partial
from PyQt6.QtCore import (
pyqtSlot as Slot,
)
from utils import FrameProvider
import cv2
def request_frame(video_capture):
if not video_capture.isOpened():
return
ret, frame = video_capture.read()
if ret:
return cv2.cvtColor(frame, cv2.COLOR_RGB2RGBA)
class OpenCVAsyncProvider(FrameProvider):
@cached_property
def video_capture(self):
return cv2.VideoCapture(0)
@Slot()
def start(self):
asyncio.ensure_future(self.start_request())
async def start_request(self):
loop = asyncio.get_running_loop()
while True:
image_frame = await loop.run_in_executor(
None, partial(request_frame, self.video_capture)
)
self.write_frame(image_frame)
import QtQuick
import QtQuick.Controls
import QtMultimedia
ApplicationWindow {
id: root
width: 640
height: 480
visible: true
VideoOutput {
id: videoOutput
anchors.fill: parent
Component.onCompleted: function() {
frame_provider.video_sink = videoOutput.videoSink;
frame_provider.start();
}
}
}
import asyncio
from functools import partial
import sys
from PyQt6.QtCore import Qt
from PyQt6.QtWidgets import QApplication, QGraphicsScene, QGraphicsView
from PyQt6.QtMultimediaWidgets import QGraphicsVideoItem
import qasync
from asyncprovider import OpenCVAsyncProvider
def main():
app = QApplication(sys.argv)
loop = qasync.QEventLoop(app)
asyncio.set_event_loop(loop)
scene = QGraphicsScene()
view = QGraphicsView(scene)
view.resize(640, 480)
view.show()
video_item = QGraphicsVideoItem()
scene.addItem(video_item)
frame_provider = OpenCVAsyncProvider()
frame_provider.video_sink = video_item.videoSink()
frame_provider.start()
video_item.nativeSizeChanged.connect(
partial(view.fitInView, video_item, Qt.AspectRatioMode.KeepAspectRatio)
)
with loop:
try:
loop.run_forever()
except asyncio.exceptions.CancelledError:
print("OK")
if __name__ == "__main__":
main()
import asyncio
from pathlib import Path
import os
import sys
from PyQt6.QtCore import QCoreApplication, Qt, QUrl
from PyQt6.QtGui import QGuiApplication
from PyQt6.QtQml import QQmlApplicationEngine
import qasync
from asyncprovider import OpenCVAsyncProvider
CURRENT_DIRECTORY = Path(__file__).resolve().parent
def main():
app = QGuiApplication(sys.argv)
loop = qasync.QEventLoop(app)
asyncio.set_event_loop(loop)
frame_provider = OpenCVAsyncProvider()
engine = QQmlApplicationEngine()
engine.rootContext().setContextProperty("frame_provider", frame_provider)
filename = os.fspath(CURRENT_DIRECTORY / "main.qml")
url = QUrl.fromLocalFile(filename)
def handle_object_created(obj, obj_url):
if obj is None and url == obj_url:
QCoreApplication.exit(-1)
engine.objectCreated.connect(
handle_object_created, Qt.ConnectionType.QueuedConnection
)
engine.load(url)
with loop:
try:
loop.run_forever()
except asyncio.exceptions.CancelledError:
print("OK")
if __name__ == "__main__":
main()
import asyncio
import sys
from PyQt6.QtWidgets import QApplication
from PyQt6.QtMultimediaWidgets import QVideoWidget
import qasync
from asyncprovider import OpenCVAsyncProvider
def main():
app = QApplication(sys.argv)
loop = qasync.QEventLoop(app)
asyncio.set_event_loop(loop)
w = QVideoWidget()
w.resize(640, 480)
w.show()
frame_provider = OpenCVAsyncProvider()
frame_provider.video_sink = w.videoSink()
frame_provider.start()
with loop:
try:
loop.run_forever()
except asyncio.exceptions.CancelledError:
print("OK")
if __name__ == "__main__":
main()
import ctypes
from dataclasses import dataclass
from functools import cached_property
from PyQt6.QtCore import (
QObject,
QSize,
pyqtSignal as Signal,
pyqtProperty as Property,
)
from PyQt6.QtGui import QImage
from PyQt6.QtMultimedia import QVideoFrame, QVideoFrameFormat
import numpy as np
def write_qvideoframe_from_ndarray(video_frame, np_image, with_ctypes=True):
plane = 0
data = video_frame.bits(plane)
assert np_image.size == video_frame.mappedBytes(plane), "Must be same size!!!"
if with_ctypes:
ctypes.memmove(
(ctypes.c_ubyte * len(data)).from_buffer(data)
if isinstance(data, memoryview)
else int(data),
np_image.ctypes.data,
video_frame.mappedBytes(plane),
)
else:
if not isinstance(data, memoryview):
data.setsize(video_frame.mappedBytes(plane))
data[:] = bytearray(np_image)
def build_video_frame(size):
video_frame_format = QVideoFrameFormat.PixelFormat.Format_BGRA8888
video_frame = QVideoFrame(QVideoFrameFormat(size, video_frame_format))
if not video_frame.isValid() or not video_frame.map(QVideoFrame.MapMode.WriteOnly):
raise RuntimeError(f"QVideoFrame is invalid or not writable")
return video_frame
def convert_qvideoframe_to_qimage(video_frame):
image_format = QVideoFrameFormat.imageFormatFromPixelFormat(
video_frame.pixelFormat()
)
if image_format == QImage.Format.Format_Invalid:
print("Invalid format")
return
plane = 0
ptr = video_frame.bits(plane)
image = QImage(
ptr if isinstance(ptr, memoryview) else int(ptr),
video_frame.width(),
video_frame.height(),
image_format,
)
return image
@dataclass(frozen=True)
class _QVideoFrameInterface:
video_frame: QVideoFrame
plane: int = 0
@cached_property
def __array_interface__(self):
data = self.video_frame.bits(self.plane)
if not isinstance(data, memoryview):
data.setsize(self.video_frame.mappedBytes(self.plane))
return dict(
shape=(self.video_frame.height(), self.video_frame.width(), 4),
typestr="|u1",
data=data,
strides=(self.video_frame.bytesPerLine(self.plane), 4, 1),
version=3,
)
def convert_qvideoframe_to_numpy(video_frame):
return np.asarray(_QVideoFrameInterface(video_frame))
@dataclass(frozen=True)
class _QImageInterface:
image: QImage
@cached_property
def __array_interface__(self):
data = self.image.bits()
if not isinstance(data, memoryview):
data.setsize(self.image.sizeInBytes())
return dict(
shape=(self.image.height(), self.image.width(), 4),
typestr="|u1",
data=data,
strides=(self.image.bytesPerLine(), 4, 1),
version=3,
)
def convert_qimage_to_numpy(video_frame):
return np.asarray(_QImageInterface(video_frame))
class FrameProvider(QObject):
video_sink_changed = Signal()
def __init__(self, parent=None):
super().__init__(parent)
self._video_sink = None
@Property(QObject, notify=video_sink_changed)
def video_sink(self):
return self._video_sink
@video_sink.setter
def video_sink(self, video_sink):
if self.video_sink is not None:
self.video_sink.destroyed.disconnect(self._handle_destroyed)
if self.video_sink.parent() is self:
self.video_sink.setParent(None)
self.video_sink.deleteLater()
self._video_sink = video_sink
video_sink.destroyed.connect(self._handle_destroyed)
self.video_sink_changed.emit()
def write_frame(self, image_frame):
if (
self.video_sink is None
or image_frame is None
or len(image_frame.shape) != 3
):
print("video_sink or image_format is None")
return
height, width, _ = image_frame.shape
try:
video_frame = build_video_frame(QSize(width, height))
except RuntimeError:
pass
else:
write_qvideoframe_from_ndarray(video_frame, image_frame, with_ctypes=True)
video_frame.unmap()
self.video_sink.setVideoFrame(video_frame)
def _handle_destroyed(self):
self._video_sink = None
@donshwah
Copy link

Upon further tinkering, I believe there is a memory leak somewhere. It might have to do with the _QImageInterface dataclass.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment