Created
December 6, 2025 11:55
-
-
Save wangzk/2c19b945507d80840adba7070e8d1362 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import re | |
| import signal | |
| import dbus | |
| import sys | |
| from dbus.mainloop.glib import DBusGMainLoop | |
| import gi | |
| gi.require_version('Gst', '1.0') | |
| gi.require_version('Gtk', '4.0') | |
| gi.require_version('Gdk', '4.0') | |
| gi.require_version('GstVideo', '1.0') | |
| gi.require_version('Adw', '1') | |
| from gi.repository import GLib, Gtk, Gdk, Gst, GstVideo, GObject, Adw, Gio | |
| DBusGMainLoop(set_as_default=True) | |
| Gst.init(None) | |
| Adw.init() | |
| # 创建libadwaita应用窗口 | |
| class ScreenCastWindow(Adw.ApplicationWindow): | |
| def __init__(self, app): | |
| super().__init__(application=app, title="Screen Mirror") | |
| self.set_default_size(800, 600) | |
| # 设置窗口允许调整大小 | |
| self.set_resizable(True) | |
| # 创建一个Box布局容器,用于视频显示 | |
| self.video_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) | |
| self.video_box.set_hexpand(True) | |
| self.video_box.set_vexpand(True) | |
| # 创建header bar | |
| self.header_bar = Adw.HeaderBar() | |
| self.header_bar.set_show_end_title_buttons(True) | |
| self.header_bar.set_show_start_title_buttons(True) | |
| # 创建主容器,包含header bar和视频内容 | |
| self.main_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) | |
| self.main_box.append(self.header_bar) | |
| self.main_box.append(self.video_box) | |
| self.set_content(self.main_box) | |
| # 保存pipeline和视频接收器引用 | |
| self.pipeline = None | |
| self.video_sink = None | |
| # 连接窗口关闭事件 | |
| self.connect('close-request', self.on_window_close) | |
| def set_gstreamer_widget(self, pipeline, sink_widget): | |
| """设置GStreamer pipeline与视频显示组件""" | |
| self.pipeline = pipeline | |
| # 如果已经有视频组件,先移除 | |
| if self.video_sink: | |
| self.video_box.remove(self.video_sink) | |
| # 设置视频组件的属性使其填充可用空间 | |
| sink_widget.set_hexpand(True) | |
| sink_widget.set_vexpand(True) | |
| # 将视频组件添加到Box布局中 | |
| self.video_box.append(sink_widget) | |
| self.video_sink = sink_widget | |
| # 显示新添加的组件 | |
| sink_widget.show() | |
| def on_window_close(self, window): | |
| """窗口关闭时停止pipeline并退出程序""" | |
| # 停止视频流 | |
| if self.pipeline: | |
| self.pipeline.set_state(Gst.State.NULL) | |
| self.pipeline = None | |
| # 退出应用程序 | |
| self.get_application().quit() | |
| return True | |
| # 创建libadwaita应用 | |
| class ScreenCastApp(Adw.Application): | |
| def __init__(self): | |
| super().__init__(application_id='com.example.screencast', | |
| flags=Gio.ApplicationFlags.DEFAULT_FLAGS) | |
| self.window = None | |
| self.pipeline = None | |
| def do_activate(self): | |
| if not self.window: | |
| self.window = ScreenCastWindow(self) | |
| self.window.present() | |
| def do_shutdown(self): | |
| """应用关闭时清理资源""" | |
| # 确保所有资源都被正确释放 | |
| if self.pipeline: | |
| self.pipeline.set_state(Gst.State.NULL) | |
| if self.window: | |
| self.window.destroy() | |
| Adw.Application.do_shutdown(self) | |
| # 强制退出程序 | |
| sys.exit(0) | |
| # 全局变量 | |
| app = ScreenCastApp() | |
| bus = dbus.SessionBus() | |
| request_iface = 'org.freedesktop.portal.Request' | |
| screen_cast_iface = 'org.freedesktop.portal.ScreenCast' | |
| request_token_counter = 0 | |
| session_token_counter = 0 | |
| sender_name = re.sub(r'\.', r'_', bus.get_unique_name()[1:]) | |
| def terminate(): | |
| """终止程序并清理资源""" | |
| if app.pipeline is not None: | |
| app.pipeline.set_state(Gst.State.NULL) | |
| app.pipeline = None | |
| if app.window: | |
| app.window.destroy() | |
| app.window = None | |
| app.quit() | |
| # 强制退出程序 | |
| sys.exit(0) | |
| def new_request_path(): | |
| global request_token_counter | |
| request_token_counter = request_token_counter + 1 | |
| token = 'u%d'%request_token_counter | |
| path = '/org/freedesktop/portal/desktop/request/%s/%s'%(sender_name, token) | |
| return (path, token) | |
| def new_session_path(): | |
| global session_token_counter | |
| session_token_counter = session_token_counter + 1 | |
| token = 'u%d'%session_token_counter | |
| path = '/org/freedesktop/portal/desktop/session/%s/%s'%(sender_name, token) | |
| return (path, token) | |
| def screen_cast_call(method, callback, *args, options={}): | |
| (request_path, request_token) = new_request_path() | |
| bus.add_signal_receiver(callback, | |
| 'Response', | |
| request_iface, | |
| 'org.freedesktop.portal.Desktop', | |
| request_path) | |
| options['handle_token'] = request_token | |
| method(*(args + (options, )), | |
| dbus_interface=screen_cast_iface) | |
| def on_gst_message(bus, message): | |
| type = message.type | |
| if type == Gst.MessageType.EOS or type == Gst.MessageType.ERROR: | |
| terminate() | |
| def play_pipewire_stream(node_id): | |
| empty_dict = dbus.Dictionary(signature="sv") | |
| fd_object = portal.OpenPipeWireRemote(session, empty_dict, | |
| dbus_interface=screen_cast_iface) | |
| fd = fd_object.take() | |
| # 首先尝试使用gtksink,这是GTK 4的原生视频接收器 | |
| pipeline_desc = f'pipewiresrc fd={fd} path={node_id} ! videoconvert ! videoscale ! gtksink name=sink' | |
| try: | |
| pipeline = Gst.parse_launch(pipeline_desc) | |
| # 获取gtksink的widget | |
| sink = pipeline.get_by_name('sink') | |
| if sink: | |
| sink_widget = sink.get_property('widget') | |
| else: | |
| sink_widget = None | |
| except Exception as e: | |
| print(f"Failed to create pipeline with gtksink: {e}") | |
| sink_widget = None | |
| # 如果gtksink不可用,尝试使用ximagesink作为备选 | |
| if not sink_widget: | |
| try: | |
| pipeline_desc = f'pipewiresrc fd={fd} path={node_id} ! videoconvert ! videoscale ! xvimagesink double-buffer=true synchronous=true name=sink' | |
| pipeline = Gst.parse_launch(pipeline_desc) | |
| sink_widget = None # ximagesink没有GTK widget | |
| except Exception as e: | |
| print(f"Failed to create pipeline with ximagesink: {e}") | |
| return | |
| # 保存pipeline引用 | |
| app.pipeline = pipeline | |
| # 设置视频显示到GTK窗口 | |
| if app.window and sink_widget: | |
| app.window.set_gstreamer_widget(pipeline, sink_widget) | |
| pipeline.set_state(Gst.State.PLAYING) | |
| pipeline.get_bus().connect('message', on_gst_message) | |
| def on_start_response(response, results): | |
| if response != 0: | |
| print("Failed to start: %s"%response) | |
| terminate() | |
| return | |
| print("streams:") | |
| for (node_id, stream_properties) in results['streams']: | |
| print("stream {}".format(node_id)) | |
| play_pipewire_stream(node_id) | |
| def on_select_sources_response(response, results): | |
| if response != 0: | |
| print("Failed to select sources: %d"%response) | |
| terminate() | |
| return | |
| print("sources selected") | |
| global session | |
| screen_cast_call(portal.Start, on_start_response, | |
| session, '') | |
| def on_create_session_response(response, results): | |
| if response != 0: | |
| print("Failed to create session: %d"%response) | |
| terminate() | |
| return | |
| global session | |
| session = results['session_handle'] | |
| print("session %s created"%session) | |
| screen_cast_call(portal.SelectSources, on_select_sources_response, | |
| session, | |
| options={ 'multiple': False, | |
| 'types': dbus.UInt32(1|2), | |
| 'cursor_mode': dbus.UInt32(2) }) | |
| portal = bus.get_object('org.freedesktop.portal.Desktop', | |
| '/org/freedesktop/portal/desktop') | |
| (session_path, session_token) = new_session_path() | |
| screen_cast_call(portal.CreateSession, on_create_session_response, | |
| options={ 'session_handle_token': session_token }) | |
| # Start the program | |
| try: | |
| app.run(sys.argv) | |
| except KeyboardInterrupt: | |
| terminate() | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| terminate() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment