-
-
Save theCalcaholic/bea95753cc90da8b7562046f9175fcc2 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3 | |
import re | |
import numpy as np | |
import dbus | |
from dbus.mainloop.glib import DBusGMainLoop | |
from time import sleep | |
from vidgear.gears import WriteGear | |
from gstreamer import GstContext, GstPipeline, GstApp, Gst, GstVideo | |
import gstreamer.utils as gst_utils | |
from typing import Any | |
DBusGMainLoop(set_as_default=True) | |
Gst.init(None) | |
bus = dbus.SessionBus() | |
request_iface = 'org.freedesktop.portal.Request' | |
camera_iface = 'org.freedesktop.portal.Camera' | |
fd = None | |
request_token_counter = 0 | |
sender_name = re.sub(r'\.', r'_', bus.get_unique_name()[1:]) | |
writer = WriteGear(output_filename="test.mp4") | |
def new_request_path(): | |
global request_token_counter | |
request_token_counter = request_token_counter + 1 | |
token = 'u%d'%request_token_counter | |
path = '/org/freedesktop/portal/desktop/request/%s/%s'%(sender_name, token) | |
return (path, token) | |
def camera_call(method, callback, *args, options={}): | |
(request_path, request_token) = new_request_path() | |
bus.add_signal_receiver(handler_function=callback, | |
signal_name='Response', | |
dbus_interface=request_iface, | |
bus_name='org.freedesktop.portal.Desktop', | |
path=request_path) | |
options['handle_token'] = request_token | |
print('request token: %s, path: %s'%(request_token, request_path)) | |
return method(*(args + (options, )), | |
dbus_interface=camera_iface) | |
def open_pipewire_remote(): | |
global fd | |
fd_object: dbus.UnixFd = portal.OpenPipeWireRemote({}, dbus_interface=camera_iface) | |
fd = fd_object.take() | |
def on_access_camera_response(response, results) -> bool: | |
global cam | |
print(results) | |
if response != 0: | |
print("Failed to access: %s"%response) | |
print(results) | |
return False | |
open_pipewire_remote() | |
return True | |
def extract_frame(sample: Gst.Sample) -> np.ndarray: | |
buffer = sample.get_buffer() | |
caps_format = sample.get_caps().get_structure(0) | |
video_format = GstVideo.VideoFormat.from_string(caps_format.get_value('format')) | |
w, h = caps_format.get_value('width'), caps_format.get_value('height') | |
c = gst_utils.get_num_channels(video_format) | |
buffer_size = buffer.get_size() | |
shape = (h, w, c) if (h * w * c == buffer_size) else (h, w, 3) | |
array = np.ndarray(shape=shape, buffer=buffer.extract_dup(0, buffer_size), dtype=gst_utils.get_np_dtype(video_format)) | |
return np.squeeze(array) | |
def process_frame(sink: GstApp.AppSink, data: Any) -> Gst.FlowReturn: | |
sample = sink.emit("pull-sample") | |
if isinstance(sample, Gst.Sample): | |
array = extract_frame(sample) | |
writer.write(array) | |
return Gst.FlowReturn.OK | |
return Gst.FlowReturn.ERROR | |
if __name__ == '__main__': | |
portal = bus.get_object('org.freedesktop.portal.Desktop', | |
'/org/freedesktop/portal/desktop') | |
if not portal.IsCameraPresent: | |
print("No camera available") | |
camera_call(portal.AccessCamera, on_access_camera_response) | |
with GstContext(): | |
for i in range(30, 0, -1): | |
if fd is not None: | |
break | |
sleep(1) | |
if i == 0: | |
print('Timeout while waiting for portal!') | |
exit(1) | |
gst_command = f'pipewiresrc fd={fd} ' \ | |
f'! jpegdec ' \ | |
f'! videoconvert ' \ | |
f'! video/x-raw,format=BGR ' \ | |
f'! queue ' \ | |
f'! appsink emit-signals=True' | |
with GstPipeline(gst_command) as pipeline: | |
appsink = pipeline.get_by_cls(GstApp.AppSink)[0] | |
appsink.connect("new-sample", process_frame, None) | |
try: | |
while not pipeline.is_done: | |
sleep(0.025) | |
except KeyboardInterrupt: | |
print("Shutting down gracefully") | |
writer.close() | |
# These are the package names as I needed them on Fedora 36 | |
ffmpeg #(nonfree variant, see https://www.tecmint.com/install-ffmpeg-in-linux/) | |
pipewire | |
pipewire-gstreamer | |
pipewire-plugin-libcamera | |
gstreamer1 | |
gstreamer1-plugins-base | |
gstreamer1-plugins-good | |
libcamera-gstreamer | |
pipewire-gstreamer | |
python3-gstreamer | |
libcamera |
gstreamer-python @ git+https://github.com/jackersson/gstreamer-python.git@c8d4e04e1cdeb3b284641b981afcf304f50480db | |
numpy | |
opencv-python==4.6.0.66 | |
vidgear==0.2.6 |
@abhiTronix I don't really know - feel free to ask the author. The project is just a way less painful (and more pythonic) wrapper around the official gstreamer bindings, which means anything I did here is possible to accomplish without the package
@theCalcaholic this a complete example of its own. I mean We can put this in vidgear's FAQ or directly provide link to this gist, that would be enough isn't it? I mean adding this to vidgear ecosystem will not produce any further benefits, since anyone can directly use any vidgear API(if required) with this code? What are you thoughts on this?
Also, multi-threading won't work here due to Gstreamer involved.
@abhiTronix Yes, you're probably right. After struggling with pipewire for a few days, I think, it's just not ready yet (at least the video API in languages other than C). Ideally, pipewire would bring virtual video devices to userland, so that noone needs to setup and use v4l2loopback anymore - that's the biggest appeal I see in it. But for now, few programs are using pipewire nodes instead of v4l devices and there's no big benefit from supporting this.
However, xdg-desktop-portals are a different story. Devs are well-advised to use them for camera access (whether they are using pipewire or not), as that increases compatibility with containerized application (which are becoming the de-facto standard of linux package distribution).
Why there's no pypi package? Project is still in beta?