Skip to content

Instantly share code, notes, and snippets.

@wangzk
Created December 6, 2025 11:55
Show Gist options
  • Select an option

  • Save wangzk/2c19b945507d80840adba7070e8d1362 to your computer and use it in GitHub Desktop.

Select an option

Save wangzk/2c19b945507d80840adba7070e8d1362 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import re
import signal
import dbus
import sys
from dbus.mainloop.glib import DBusGMainLoop
import gi
gi.require_version('Gst', '1.0')
gi.require_version('Gtk', '4.0')
gi.require_version('Gdk', '4.0')
gi.require_version('GstVideo', '1.0')
gi.require_version('Adw', '1')
from gi.repository import GLib, Gtk, Gdk, Gst, GstVideo, GObject, Adw, Gio
DBusGMainLoop(set_as_default=True)
Gst.init(None)
Adw.init()
# 创建libadwaita应用窗口
class ScreenCastWindow(Adw.ApplicationWindow):
def __init__(self, app):
super().__init__(application=app, title="Screen Mirror")
self.set_default_size(800, 600)
# 设置窗口允许调整大小
self.set_resizable(True)
# 创建一个Box布局容器,用于视频显示
self.video_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.video_box.set_hexpand(True)
self.video_box.set_vexpand(True)
# 创建header bar
self.header_bar = Adw.HeaderBar()
self.header_bar.set_show_end_title_buttons(True)
self.header_bar.set_show_start_title_buttons(True)
# 创建主容器,包含header bar和视频内容
self.main_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.main_box.append(self.header_bar)
self.main_box.append(self.video_box)
self.set_content(self.main_box)
# 保存pipeline和视频接收器引用
self.pipeline = None
self.video_sink = None
# 连接窗口关闭事件
self.connect('close-request', self.on_window_close)
def set_gstreamer_widget(self, pipeline, sink_widget):
"""设置GStreamer pipeline与视频显示组件"""
self.pipeline = pipeline
# 如果已经有视频组件,先移除
if self.video_sink:
self.video_box.remove(self.video_sink)
# 设置视频组件的属性使其填充可用空间
sink_widget.set_hexpand(True)
sink_widget.set_vexpand(True)
# 将视频组件添加到Box布局中
self.video_box.append(sink_widget)
self.video_sink = sink_widget
# 显示新添加的组件
sink_widget.show()
def on_window_close(self, window):
"""窗口关闭时停止pipeline并退出程序"""
# 停止视频流
if self.pipeline:
self.pipeline.set_state(Gst.State.NULL)
self.pipeline = None
# 退出应用程序
self.get_application().quit()
return True
# 创建libadwaita应用
class ScreenCastApp(Adw.Application):
def __init__(self):
super().__init__(application_id='com.example.screencast',
flags=Gio.ApplicationFlags.DEFAULT_FLAGS)
self.window = None
self.pipeline = None
def do_activate(self):
if not self.window:
self.window = ScreenCastWindow(self)
self.window.present()
def do_shutdown(self):
"""应用关闭时清理资源"""
# 确保所有资源都被正确释放
if self.pipeline:
self.pipeline.set_state(Gst.State.NULL)
if self.window:
self.window.destroy()
Adw.Application.do_shutdown(self)
# 强制退出程序
sys.exit(0)
# 全局变量
app = ScreenCastApp()
bus = dbus.SessionBus()
request_iface = 'org.freedesktop.portal.Request'
screen_cast_iface = 'org.freedesktop.portal.ScreenCast'
request_token_counter = 0
session_token_counter = 0
sender_name = re.sub(r'\.', r'_', bus.get_unique_name()[1:])
def terminate():
"""终止程序并清理资源"""
if app.pipeline is not None:
app.pipeline.set_state(Gst.State.NULL)
app.pipeline = None
if app.window:
app.window.destroy()
app.window = None
app.quit()
# 强制退出程序
sys.exit(0)
def new_request_path():
global request_token_counter
request_token_counter = request_token_counter + 1
token = 'u%d'%request_token_counter
path = '/org/freedesktop/portal/desktop/request/%s/%s'%(sender_name, token)
return (path, token)
def new_session_path():
global session_token_counter
session_token_counter = session_token_counter + 1
token = 'u%d'%session_token_counter
path = '/org/freedesktop/portal/desktop/session/%s/%s'%(sender_name, token)
return (path, token)
def screen_cast_call(method, callback, *args, options={}):
(request_path, request_token) = new_request_path()
bus.add_signal_receiver(callback,
'Response',
request_iface,
'org.freedesktop.portal.Desktop',
request_path)
options['handle_token'] = request_token
method(*(args + (options, )),
dbus_interface=screen_cast_iface)
def on_gst_message(bus, message):
type = message.type
if type == Gst.MessageType.EOS or type == Gst.MessageType.ERROR:
terminate()
def play_pipewire_stream(node_id):
empty_dict = dbus.Dictionary(signature="sv")
fd_object = portal.OpenPipeWireRemote(session, empty_dict,
dbus_interface=screen_cast_iface)
fd = fd_object.take()
# 首先尝试使用gtksink,这是GTK 4的原生视频接收器
pipeline_desc = f'pipewiresrc fd={fd} path={node_id} ! videoconvert ! videoscale ! gtksink name=sink'
try:
pipeline = Gst.parse_launch(pipeline_desc)
# 获取gtksink的widget
sink = pipeline.get_by_name('sink')
if sink:
sink_widget = sink.get_property('widget')
else:
sink_widget = None
except Exception as e:
print(f"Failed to create pipeline with gtksink: {e}")
sink_widget = None
# 如果gtksink不可用,尝试使用ximagesink作为备选
if not sink_widget:
try:
pipeline_desc = f'pipewiresrc fd={fd} path={node_id} ! videoconvert ! videoscale ! xvimagesink double-buffer=true synchronous=true name=sink'
pipeline = Gst.parse_launch(pipeline_desc)
sink_widget = None # ximagesink没有GTK widget
except Exception as e:
print(f"Failed to create pipeline with ximagesink: {e}")
return
# 保存pipeline引用
app.pipeline = pipeline
# 设置视频显示到GTK窗口
if app.window and sink_widget:
app.window.set_gstreamer_widget(pipeline, sink_widget)
pipeline.set_state(Gst.State.PLAYING)
pipeline.get_bus().connect('message', on_gst_message)
def on_start_response(response, results):
if response != 0:
print("Failed to start: %s"%response)
terminate()
return
print("streams:")
for (node_id, stream_properties) in results['streams']:
print("stream {}".format(node_id))
play_pipewire_stream(node_id)
def on_select_sources_response(response, results):
if response != 0:
print("Failed to select sources: %d"%response)
terminate()
return
print("sources selected")
global session
screen_cast_call(portal.Start, on_start_response,
session, '')
def on_create_session_response(response, results):
if response != 0:
print("Failed to create session: %d"%response)
terminate()
return
global session
session = results['session_handle']
print("session %s created"%session)
screen_cast_call(portal.SelectSources, on_select_sources_response,
session,
options={ 'multiple': False,
'types': dbus.UInt32(1|2),
'cursor_mode': dbus.UInt32(2) })
portal = bus.get_object('org.freedesktop.portal.Desktop',
'/org/freedesktop/portal/desktop')
(session_path, session_token) = new_session_path()
screen_cast_call(portal.CreateSession, on_create_session_response,
options={ 'session_handle_token': session_token })
# Start the program
try:
app.run(sys.argv)
except KeyboardInterrupt:
terminate()
except Exception as e:
print(f"Error: {e}")
terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment