Skip to content

Instantly share code, notes, and snippets.

@ABeltramo
Last active July 10, 2024 13:11
Show Gist options
  • Save ABeltramo/39284b1731bb1aea4c09d425ec0fcd6a to your computer and use it in GitHub Desktop.
Save ABeltramo/39284b1731bb1aea4c09d425ec0fcd6a to your computer and use it in GitHub Desktop.
Gstreamer WebRTC demo using gst-wayland-display

Requirements

Setup Rust and Gstreamer on your machine. Install our custom gst-wayland-display Gstreamer plugin

git clone https://github.com/games-on-whales/gst-wayland-display.git
cd gst-wayland-display
# Install cargo-c if you don't have it already
cargo install cargo-c
# Build and install the plugin, by default under 
cargo cinstall --prefix=/usr/local

Setup Python:

python3 -m venv venv
source .venv/bin/activate
pip install -r requirements.txt

How to run

Open up https://webrtc.nirbheek.in/ and select [☑️] Remote offerer then copy the "Our id" field below and paste it at the end of the python command

# Change 3350 with the "our ID" field from https://webrtc.nirbheek.in/
python3 webrtc_sendrecv.py --server=wss://webrtc.nirbheek.in:8443 3350

This will start streaming a black window, you can then start any Wayland application using the newly created Wayland socket

WAYLAND_DISPLAY=wayland-1 weston-simple-egl

Where wayland-1 is the highest available wayland socket under ${XDG_RUNTIME_DIR}, it's set in the python file here:

os.environ['XDG_RUNTIME_DIR'] = "/run/user/1000/"

Where to go from here?

Read more about Gstreamer and Webrtc, here's a great introductory blogpost

You'll quickly notice that there's no mouse/keyboard input. To debug things, you could manually set the /dev/input/event* devices in the add_devices() python method so that you expose some locally plugged mouse and keyboard.
Ideally, you should integrate inputtino so that you can create and control virtual devices programmatically. There's a very simple web interface that you can try out using:

docker run --init --name inputtino -p 8080:8080 -v /dev/input:/dev/input:rw --device /dev/uinput ghcr.io/games-on-whales/inputtino:stable

To test things further you could:

  • Start up inputtino and create a virtual mouse and keyboard using the web interface
  • Copy the newly create /dev/input/event* files into the add_devices() python method
  • Start up the python script

This should allow you on one browser tab to see the video feed and on another browser tab to use mouse and keyboard to control the remote video screen. See the video below for an example of this

import random
import ssl
import websockets
import asyncio
import os
import sys
import json
import argparse
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
gi.require_version('GstWebRTC', '1.0')
from gi.repository import GstWebRTC
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp
from gi.repository import GObject
os.environ['GST_PLUGIN_PATH'] = "/home/<USER>/gst-wayland-display/"
os.environ['DISPLAY'] = "wayland-1"
os.environ['WAYLAND_DISPLAY'] = "wayland-1"
os.environ['XDG_RUNTIME_DIR'] = "/run/user/1000/"
PIPELINE_DESC = '''
webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
waylanddisplaysrc ! video/x-raw,width=1280,height=720,format=RGBx,framerate=60/1 ! queue ! videoconvert ! vp8enc deadline=1 ! rtpvp8pay ! queue ! application/x-rtp,media=video,width=1280,height=720,format=RGBx,framerate=60/1,encoding-name=VP8,payload=97 ! sendrecv.
audiotestsrc ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay ! queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
'''
from websockets.version import version as wsv
class WebRTCClient:
def __init__(self, id_, peer_id, server):
self.id_ = id_
self.conn = None
self.pipe = None
self.webrtc = None
self.set_devices = False
self.peer_id = peer_id
self.server = server or 'wss://webrtc.nirbheek.in:8443'
async def connect(self):
print("connect")
#sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
sslctx = ssl.create_default_context()
self.conn = await websockets.connect(self.server, ssl=sslctx)
await self.conn.send('HELLO %d' % self.id_)
async def setup_call(self):
print("setup_call")
await self.conn.send('SESSION {}'.format(self.peer_id))
def send_sdp_offer(self, offer):
text = offer.sdp.as_text()
print ('Sending offer:\n%s' % text)
msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
loop = asyncio.new_event_loop()
loop.run_until_complete(self.conn.send(msg))
loop.close()
def on_offer_created(self, promise, _, __):
print("on_offer_created")
promise.wait()
reply = promise.get_reply()
#offer = reply['offer']
offer = reply.get_value('offer')
promise = Gst.Promise.new()
self.webrtc.emit('set-local-description', offer, promise)
promise.interrupt()
self.send_sdp_offer(offer)
def on_negotiation_needed(self, element):
print("on_negotiation_needed")
promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
element.emit('create-offer', None, promise)
def send_ice_candidate_message(self, _, mlineindex, candidate):
print("send_ice_candidate_message")
icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex, 'remoteDescription': "asd"}})
loop = asyncio.new_event_loop()
loop.run_until_complete(self.conn.send(icemsg))
loop.close()
def add_devices(self):
if not self.set_devices:
print("add_devices")
self.set_devices = True
custom_structure = Gst.Structure.new_empty("VirtualDevicesReady")
value = GObject.Value(GObject.ValueArray)
value.set_value([
"/dev/input/event20",
"/dev/input/event21",
"/dev/input/event22",
])
custom_structure.set_value("paths", value)
event = Gst.Event.new_custom(Gst.EventType.CUSTOM_UPSTREAM, custom_structure)
self.pipe.send_event(event)
def on_incoming_decodebin_stream(self, _, pad):
print("on_incoming_decodebin_stream")
self.add_devices()
if not pad.has_current_caps():
print (pad, 'has no caps, ignoring')
return
caps = pad.get_current_caps()
s = caps.get_structure(0)
name = s.get_name()
print("name "+name)
if name.startswith('way') or name.startswith('rtspsrc') or name.startswith('video'):
q = Gst.ElementFactory.make('queue')
conv = Gst.ElementFactory.make('videoconvert')
sink = Gst.ElementFactory.make('autovideosink')
self.pipe.add(q)
self.pipe.add(conv)
self.pipe.add(sink)
self.pipe.sync_children_states()
pad.link(q.get_static_pad('sink'))
q.link(conv)
conv.link(sink)
elif name.startswith('audio'):
q = Gst.ElementFactory.make('queue')
conv = Gst.ElementFactory.make('audioconvert')
resample = Gst.ElementFactory.make('audioresample')
sink = Gst.ElementFactory.make('autoaudiosink')
self.pipe.add(q)
self.pipe.add(conv)
self.pipe.add(resample)
self.pipe.add(sink)
self.pipe.sync_children_states()
pad.link(q.get_static_pad('sink'))
q.link(conv)
conv.link(resample)
resample.link(sink)
def on_incoming_stream(self, _, pad):
print("on_incoming_stream")
if pad.direction != Gst.PadDirection.SRC:
return
decodebin = Gst.ElementFactory.make('decodebin')
decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
self.pipe.add(decodebin)
decodebin.sync_state_with_parent()
self.webrtc.link(decodebin)
def start_pipeline(self):
print("start_pipeline")
self.pipe = Gst.parse_launch(PIPELINE_DESC)
self.webrtc = self.pipe.get_by_name('sendrecv')
self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
self.webrtc.connect('pad-added', self.on_incoming_stream)
self.pipe.set_state(Gst.State.PLAYING)
def handle_sdp(self, message):
print("handle_sdp")
assert (self.webrtc)
msg = json.loads(message)
if 'sdp' in msg:
sdp = msg['sdp']
assert(sdp['type'] == 'answer')
sdp = sdp['sdp']
print ('Received answer:\n%s' % sdp)
res, sdpmsg = GstSdp.SDPMessage.new()
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
promise = Gst.Promise.new()
self.webrtc.emit('set-remote-description', answer, promise)
promise.interrupt()
elif 'ice' in msg:
ice = msg['ice']
candidate = ice['candidate']
sdpmlineindex = ice['sdpMLineIndex']
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
def close_pipeline(self):
print("close_pipeline")
self.pipe.set_state(Gst.State.NULL)
self.pipe = None
self.webrtc = None
async def loop(self):
assert self.conn
async for message in self.conn:
if message == 'HELLO':
await self.setup_call()
elif message == 'SESSION_OK':
self.start_pipeline()
elif message.startswith('ERROR'):
print (message)
self.close_pipeline()
return 1
else:
self.handle_sdp(message)
self.close_pipeline()
return 0
async def stop(self):
if self.conn:
await self.conn.close()
self.conn = None
def check_plugins():
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
"rtpmanager", "videotestsrc", "audiotestsrc"]
missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
if len(missing):
print('Missing gstreamer plugins:', missing)
return False
return True
if __name__=='__main__':
Gst.init(None)
if not check_plugins():
sys.exit(1)
parser = argparse.ArgumentParser()
parser.add_argument('peerid', help='String ID of the peer to connect to')
parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
args = parser.parse_args()
our_id = random.randrange(10, 10000)
c = WebRTCClient(our_id, args.peerid, args.server)
loop = asyncio.get_event_loop()
loop.run_until_complete(c.connect())
res = loop.run_until_complete(c.loop())
sys.exit(res)
websockets
PyGObject
attrs
This file has been truncated, but you can view the full file.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment