Skip to content

Instantly share code, notes, and snippets.

@le-chat
Last active July 15, 2022 10:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save le-chat/d74457383b6e821666b90e8d5cebcd38 to your computer and use it in GitHub Desktop.
Save le-chat/d74457383b6e821666b90e8d5cebcd38 to your computer and use it in GitHub Desktop.
webrtcbin-no-pad-added

Run as:

git clone https://gist.github.com/d74457383b6e821666b90e8d5cebcd38.git webrtcbin-no-pad-added
cd webrtcbin-no-pad-added
docker build -t webrtcbin-no-pad-added . && docker run --rm -it webrtcbin-no-pad-added

This will start Janus webrtc server, virtual RTSP camera and run gstreamer-based webrtc client (webrtc_receiver.py). From logs client seems to establish connection.

Expected behaviour: new pad added, we see begin on_incoming stream in logs, appsink works.

Actual behaviour: no evidence that pad-added signal has been emitted.

Gstreamer 1.20.1, python 3.9.

The new version also runs rust webrtcbin client, which also does not emit pad-added.

[package]
name = "wrb"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
structopt = { version = "0.3", default-features = false }
rand = "0.7"
reqwest = { version = "0.11", features = ["blocking", "json"] }
glib = "0.15"
gst = { package = "gstreamer", version = "0.18", features = ["v1_20"] }
gst-webrtc = { package = "gstreamer-webrtc", version = "0.18" }
gst-sdp = { package = "gstreamer-sdp", version = "0.18", features = ["v1_20"] }
serde = "1"
serde_derive = "1"
serde_json = "1"
[[bin]]
name = "wrb"
path = "main.rs"
let pkgs = import <nixpkgs> {};
pyEnv = pkgs.python3.withPackages (p: [
p.pygobject3
p.gst-python
p.aiohttp
p.opencv4
p.numpy
]);
gst_deps = with pkgs; [
gobject-introspection
gst_all_1.gst-rtsp-server
gst_all_1.gstreamer
gst_all_1.gst-plugins-base
gst_all_1.gst-plugins-bad
gst_all_1.gst-plugins-ugly
gst_all_1.gst-plugins-good
libnice
];
deps = [ pyEnv pkgs.janus-gateway pkgs.busybox ] ++ gst_deps ++ [ pkgs.cargo pkgs.openssl pkgs.pkg-config ];
app = pkgs.writeShellApplication {
name = "run.sh";
runtimeInputs = deps;
text = ''
janus -F . 2>&1 | awk '{print "janus | " $0;}' | ts -s &
python rtsp_server.py 2>&1 | awk '{print "rtsp | " $0;}' | ts -s &
python webrtc_receiver.py 2>&1 | awk '{print "python | " $0;}' | ts -s &
target/debug/wrb 2>&1 | awk '{print "rust | " $0;}' | ts -s &
wait
'';
};
in [(app // {app = app;})] ++ deps
FROM nixos/nix
RUN nix-channel --add https://nixos.org/channels/nixos-22.05 nixpkgs && nix-channel --update
RUN mkdir /opt/
WORKDIR /opt
COPY default.nix /opt/
RUN nix-build
ENV GST_DEBUG=3,*webrtc*:6,*rtpsource*:0,*rtpstorage*:1
COPY ./* /opt/
RUN nix-shell --run 'cargo build'
CMD nix-shell --run run.sh
general: {
debug_level = 5
admin_secret = "janusoverlord"
}
import random
import string
import time
import logging
import asyncio
logger = logging.getLogger(__name__)
def transaction_id():
return "".join(random.choice(string.ascii_letters) for x in range(12))
# async def connect_to_janus(id, pc, session, add_track_callback):
# await session.create()
# @pc.on("track")
# def on_track(track):
# logger.info("Receiving {}".format(track))
# add_track_callback(track)
# plugin = await session.attach("janus.plugin.streaming")
# response = await plugin.sync_send({"body": {"request": "list"}})
# logger.info(response["plugindata"]["data"]["list"])
# response = await plugin.async_send(
# {"body": {
# "request": "watch",
# "id": id
# }})
# assert (response["jsep"]["type"] == "offer")
# offer = RTCSessionDescription(sdp=response["jsep"]["sdp"],
# type=response["jsep"]["type"])
# await pc.setRemoteDescription(offer)
# await pc.setLocalDescription(await pc.createAnswer())
# request = {"request": "start"}
# response = await plugin.async_send({
# "body": request,
# "jsep": {
# "sdp": pc.localDescription.sdp,
# "trickle": False,
# "type": pc.localDescription.type
# },
# })
class JanusPlugin:
def __init__(self, session, url):
self._queue = asyncio.Queue()
self._session = session
self._url = url
async def sync_send(self, payload):
message = {"janus": "message", "transaction": transaction_id()}
message.update(payload)
logger.debug(f"To Janus: {message}")
async with self._session._http.post(self._url,
json=message) as response:
data = await response.json()
logger.debug(f"From Janus: {data}")
assert data["janus"] == "success"
return data
async def async_send(self, payload):
message = {"janus": "message", "transaction": transaction_id()}
message.update(payload)
logger.debug(f"To Janus: {message}")
async with self._session._http.post(self._url,
json=message) as response:
data = await response.json()
logger.debug(f"From Janus: {data}")
assert data["janus"] == "ack"
response = await self._queue.get()
assert response["transaction"] == message["transaction"]
return response
class JanusSession:
def __init__(self, http_session, url):
self._http = http_session
self._poll_task = None
self._plugins = {}
self._root_url = url
self._session_url = None
async def attach(self, plugin):
message = {
"janus": "attach",
"plugin": plugin,
"transaction": transaction_id()
}
async with self._http.post(self._session_url,
json=message) as response:
data = await response.json()
assert data["janus"] == "success"
plugin_id = data["data"]["id"]
plugin = JanusPlugin(self,
self._session_url + "/" + str(plugin_id))
self._plugins[plugin_id] = plugin
return plugin
async def create(self):
message = {"janus": "create", "transaction": transaction_id()}
async with self._http.post(self._root_url, json=message) as response:
data = await response.json()
assert data["janus"] == "success"
session_id = data["data"]["id"]
self._session_url = self._root_url + "/" + str(session_id)
self._poll_task = asyncio.ensure_future(self._poll())
async def destroy(self):
if self._poll_task:
self._poll_task.cancel()
self._poll_task = None
if self._session_url:
message = {"janus": "destroy", "transaction": transaction_id()}
async with self._http.post(self._session_url,
json=message) as response:
data = await response.json()
assert data["janus"] == "success"
self._session_url = None
async def _poll(self):
while True:
params = {"maxev": 1, "rid": int(time.time() * 1000)}
async with self._http.get(self._session_url,
params=params) as response:
data = await response.json()
if data["janus"] == "event":
plugin = self._plugins.get(data["sender"], None)
if plugin:
await plugin._queue.put(data)
else:
logger.debug(data)
# Web server stuff: whether any should be enabled, which ports they
# should use, whether security should be handled directly or demanded to
# an external application (e.g., web frontend) and what should be the
# base path for the Janus API protocol. You can also specify the
# threading model to use for the HTTP webserver: by default this is
# 'unlimited' (which means a thread per connection, as specified by the
# libmicrohttpd documentation), using a number will make use of a thread
# pool instead. Since long polls are involved, make sure you choose a
# value that doesn't keep new connections waiting. Notice that by default
# all the web servers will try and bind on both IPv4 and IPv6: if you
# want to only bind to IPv4 addresses (e.g., because your system does not
# support IPv6), you should set the web server 'ip' property to '0.0.0.0'.
general: {
json = "indented" # Whether the JSON messages should be indented (default),
# plain (no indentation) or compact (no indentation and no spaces)
base_path = "/janus" # Base path to bind to in the web server (plain HTTP only)
threads = "unlimited" # unlimited=thread per connection, number=thread pool
http = true # Whether to enable the plain HTTP interface
port = 8088 # Web server HTTP port
#interface = "eth0" # Whether we should bind this server to a specific interface only
#ip = "192.168.0.1" # Whether we should bind this server to a specific IP address (v4 or v6) only
https = false # Whether to enable HTTPS (default=false)
#secure_port = 8089 # Web server HTTPS port, if enabled
#secure_interface = "eth0" # Whether we should bind this server to a specific interface only
#secure_ip = "192.168.0.1" # Whether we should bind this server to a specific IP address (v4 or v6) only
#acl = "127.,192.168.0." # Only allow requests coming from this comma separated list of addresses
}
# Janus can also expose an admin/monitor endpoint, to allow you to check
# which sessions are up, which handles they're managing, their current
# status and so on. This provides a useful aid when debugging potential
# issues in Janus. The configuration is pretty much the same as the one
# already presented above for the webserver stuff, as the API is very
# similar: choose the base bath for the admin/monitor endpoint (/admin
# by default), ports, threading model, etc. Besides, you can specify
# a secret that must be provided in all requests as a crude form of
# authorization mechanism, and partial or full source IPs if you want to
# limit access basing on IP addresses. For security reasons, this
# endpoint is disabled by default, enable it by setting admin_http=yes.
admin: {
admin_base_path = "/admin" # Base path to bind to in the admin/monitor web server (plain HTTP only)
admin_threads = "unlimited" # unlimited=thread per connection, number=thread pool
admin_http = false # Whether to enable the plain HTTP interface
admin_port = 7088 # Admin/monitor web server HTTP port
#admin_interface = "eth0" # Whether we should bind this server to a specific interface only
#admin_ip = "192.168.0.1" # Whether we should bind this server to a specific IP address (v4 or v6) only
admin_https = false # Whether to enable HTTPS (default=false)
#admin_secure_port = 7889 # Admin/monitor web server HTTPS port, if enabled
#admin_secure_interface = "eth0" # Whether we should bind this server to a specific interface only
#admin_secure_ip = "192.168.0.1 # Whether we should bind this server to a specific IP address (v4 or v6) only
#admin_acl = "127.,192.168.0." # Only allow requests coming from this comma separated list of addresses
}
# The HTTP servers created in Janus support CORS out of the box, but by
# default they return a wildcard (*) in the 'Access-Control-Allow-Origin'
# header. This works fine in most situations, except when we have to
# respond to a credential request (withCredentials=true in the XHR). If
# you need that, uncomment and set the 'allow_origin' below to specify
# what must be returned in 'Access-Control-Allow-Origin'. More details:
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Access_control_CORS
cors: {
#allow_origin = "http://foo.example"
}
# Certificate and key to use for HTTPS, if enabled (and passphrase if needed).
# You can also disable insecure protocols and ciphers by configuring the
# 'ciphers' property accordingly (no limitation by default).
certificates: {
cert_pem = "/usr/local/share/janus/certs/mycert.pem"
cert_key = "/usr/local/share/janus/certs/mycert.key"
#cert_pwd = "secretpassphrase"
#ciphers = "PFS:-VERS-TLS1.0:-VERS-TLS1.1:-3DES-CBC:-ARCFOUR-128"
}
use std::sync::{Arc, Mutex, Weak};
// use std::collections::HashMap;
use serde_json::json;
use gst::element_error;
use gst::prelude::*;
use serde_derive::{Deserialize, Serialize};
// janus setup
// fn mk_janus_session() -> Result<String, reqwest::Error> {
fn mk_janus_session<>() -> Option<u64> {
let req = json!({
"janus": "create",
"transaction": "012345678901"
});
let client = reqwest::blocking::Client::new();
let resp = client.post("http://127.0.0.1:8088/janus").json(&req).send().ok()?;
let j: serde_json::Value = resp.json().ok()?;
println!("json: {:#?}", j);
let s = j["data"]["id"].as_u64()?;
println!("janus session: {s}");
return Some(s);
}
fn attach_janus_plugin(session_id: u64) -> Option<u64> {
let req = json!({
"janus": "attach",
"plugin": "janus.plugin.streaming",
"transaction": "012345678902"
});
let client = reqwest::blocking::Client::new();
let session_url = "http://127.0.0.1:8088/janus/".to_owned() + &session_id.to_string();
let resp: serde_json::Value = client.post(session_url)
.json(&req)
.send().ok()?
.json().ok()?;
println!("janus plugin: {resp}");
let plugin_id = resp["data"]["id"].as_u64()?;
println!("janus plugin id: {plugin_id}");
return Some(plugin_id);
}
fn camera(session_id: u64, plugin_id: u64) -> Option<u64> {
let request = json!({
"janus": "message",
"transaction": "012345678903",
"body": {
"request": "create",
"type": "rtsp",
"name": "virtual camera",
"description": "",
"video": true,
"url": "rtsp://127.0.0.1:4554/tag"
}
});
let client = reqwest::blocking::Client::new();
let url = "http://127.0.0.1:8088/janus/".to_owned() + &session_id.to_string() + "/" + &plugin_id.to_string();
let response: serde_json::Value = client.post(url)
.json(&request)
.send().ok()?
.json().ok()?;
println!("camera response: {response}");
let mountpoint = response["plugindata"]["data"]["stream"]["id"].as_u64()?;
println!("mountpoint: {mountpoint}");
return Some(mountpoint);
}
// fn setup() -> Option<()> {
// let session_id = mk_janus_session()?;
// let plugin_id = attach_janus_plugin(session_id.clone())?;
// let mountpoint = camera(session_id.clone(), plugin_id.clone())?;
// let sdp = get_sdp_offer(session_id, plugin_id, mountpoint)?;
// println!("SDP: {sdp}");
// Some(())
// }
fn get_sdp_offer(session_id: u64, plugin_id: u64, mountpoint: u64) -> Option<String> {
let request = json!({
"janus": "message",
"transaction": "012345678904",
"body": {
"request": "watch",
"id": mountpoint
}
});
let client = reqwest::blocking::Client::new();
let session_url = "http://127.0.0.1:8088/janus/".to_owned() + &session_id.to_string();
let url = session_url.clone() + "/" + &plugin_id.to_string();
let response: serde_json::Value = client.post(url)
.json(&request)
.send().ok()?
.json().ok()?;
println!("watch response: {response}");
// poll:
let poll_url = session_url.clone() + "?maxev=1&rid=1";
println!("DEBUG poll_url {poll_url}");
let answer: serde_json::Value = client.get(poll_url).send().ok()?.json().ok()?;
println!("polled: {answer}");
if answer["janus"] == "event" {
if answer["sender"] == plugin_id {
return Some(answer["jsep"]["sdp"].as_str()?.to_string());
}
}
return None;
}
fn ask_janus(session_id: u64) -> Option<()> {
let client = reqwest::blocking::Client::new();
let session_url = "http://127.0.0.1:8088/janus/".to_owned() + &session_id.to_string();
let poll_url = session_url + "?maxev=1&rid=2";
println!("polling Janus... {poll_url}");
let sent = client.get(poll_url).timeout(std::time::Duration::from_secs(40)).send();
let j = sent.ok()?.json();
let answer: serde_json::Value = j.ok()?;
println!("received from Janus: {:#?}", answer);
Some(())
}
fn send_sdp_answer(session_id: u64, plugin_id: u64, sdp: String) -> Option<()> {
let request = json!({
"janus": "message",
"transaction": "012345678905",
"body": { "request": "start" },
"jsep": {
"sdp": sdp,
"trickle": false,
"type": "answer"
}
});
println!("send_sdp_answer request: {request}");
let client = reqwest::blocking::Client::new();
let session_url = "http://127.0.0.1:8088/janus/".to_owned() + &session_id.to_string();
let url = session_url.clone() + "/" + &plugin_id.to_string();
let response: serde_json::Value = client.post(url)
.json(&request)
.send().ok()?
.json().ok()?;
println!("send sdp response: {response}");
Some(())
}
// gstreamer part
// start pipeline
fn start_pipeline(session_id: u64, plugin_id: u64, sdp: String) -> Option<Arc<gst::Element>> {
let p = gst::parse_launch("webrtcbin name=wrb").ok()?;
println!("pipeline parsed");
p.connect_pad_added(move |_webrtc, pad| {
// on incoming stream
println!("begin on_incoming_stream");
});
p.set_state(gst::State::Playing).expect("Couldn't set pipeline to Playing");
// handle offer (set-remote-description)
let pt = p.type_();
let sdpmsg = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()).ok()?;
let offer = gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Offer, sdpmsg);
println!("will use offer {:#?}", offer);
let promise = gst::Promise::new();
let rd = glib::subclass::signal::SignalId::lookup("set-remote-description", pt)?;
p.emit::<()>(rd, &[&offer, &promise]);
promise.wait();
// negotiate (create-answer)
let pr = Arc::new(p);
let pr2 = pr.clone();
let promise2 = gst::Promise::with_change_func(move |reply| {
let res = reply.ok().expect("reply");
let res2 = res.expect("reply2");
let answer: gst_webrtc::WebRTCSessionDescription = res2
.value("answer")
.expect("Invalid argument")
.get().expect("bad answer");
println!("will use answer {:#?}", answer);
// let answer_string = answer.type_();
// println!("answer {answer_string} -- {res2}");
let ld = glib::subclass::signal::SignalId::lookup("set-local-description", pt).expect("Sig");
let promise3 = gst::Promise::new();
pr2.emit::<()>(ld, &[&answer, &promise3]);
promise3.wait();
// send sdp
let sdp = answer.sdp().as_text().unwrap();
send_sdp_answer(session_id, plugin_id, sdp);
});
let ca = glib::subclass::signal::SignalId::lookup("create-answer", pt)?;
pr.emit::<()>(ca, &[&None::<gst::Structure>, &promise2]);
promise2.wait();
println!("pipeline is started");
Some(pr)
}
fn about(wrb: &gst::Element) {
// let ld = wrb.property::<gst_webrtc::WebRTCSessionDescription>("current-local-description");
let ld = wrb.property::<glib::Value>("current-local-description");
println!("current-local-description: {:#?}", ld);
let rd = wrb.property::<glib::Value>("current-remote-description");
println!("current-remote-description: {:#?}", rd);
}
fn play() -> Option<()> {
gst::init().ok()?;
let session_id = mk_janus_session()?;
let plugin_id = attach_janus_plugin(session_id)?;
let mountpoint = camera(session_id, plugin_id)?;
let sdp = get_sdp_offer(session_id, plugin_id, mountpoint)?;
println!("SDP: {sdp}");
let wrb = start_pipeline(session_id, plugin_id, sdp)?;
loop {
ask_janus(session_id)?;
about(&wrb);
}
println!("we have played");
Some(())
}
fn main() {
play().expect("~.~");
println!("end")
}
#!/usr/bin/env python
# -*- coding:utf-8 vi:ts=4:noexpandtab
# Simple RTSP server. Run as-is or with a command-line to replace the default pipeline
# nix-shell -p gst_all_1.gst-rtsp-server gst_all_1.gstreamer python38Packages.gst-python python38Packages.pygobject3 gobjectIntrospection gst_all_1.gst-plugins-ugly gst_all_1.gst-plugins-good --run 'python gst-rtsp-server.py'
import sys
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import GLib, Gst, GstRtspServer, GObject
import numpy as np
import cv2 as cv
#loop = GObject.MainLoop()
loop = GLib.MainLoop()
#GObject.threads_init()
Gst.init(None)
class MyFactory(GstRtspServer.RTSPMediaFactory):
def __init__(self, myfile):
GstRtspServer.RTSPMediaFactory.__init__(self)
# self.counter = 0
self.width = 1280 #1280
self.height = 720
self.fps = 12
self.tag = cv.imread("tag16_05_00000.png")
def do_create_element(self, url):
# s_src = "filesrc location=/home/i/gst-rtsp/0b.png ! decodebin ! videoconvert ! imagefreeze ! video/x-raw,rate=15,width={self.width},height={self.height},format=I420"
s_src = f"appsrc name=src is-live=true block=true format=time caps=video/x-raw,format=BGR,width={self.width},height={self.height},framerate={self.fps}/1"
s_h264 = f"x264enc tune=zerolatency key-int-max={self.fps} bitrate=512 ! video/x-h264,profile=constrained-baseline,stream-format=byte-stream"
pipeline_str = f"( {s_src} ! queue max-size-buffers=1 name=q_enc ! videoconvert ! {s_h264} ! rtph264pay name=pay0 mtu=900 pt=96 )"
print(pipeline_str)
return Gst.parse_launch(pipeline_str)
def do_media_configure(self, media: GstRtspServer.RTSPMedia):
appsrc = media.get_element().get_by_name("src")
appsrc.counter = 0
appsrc.connect("need-data", self.on_need_data)
def on_need_data(self, src, length):
buf = np.ones((self.height, self.width, 3), dtype=np.uint8) * 255
def put_tag(origin, size): # (h,w)
tag = cv.resize(self.tag, size, interpolation=cv.INTER_NEAREST)
buf[origin[0]:origin[0] + size[0],
origin[1]:origin[1] + size[1], :] = tag
# tag = cv.resize(self.tag, (256, 256), interpolation=cv.INTER_NEAREST)
# buf[256:512,256:512,:] = tag
# put_tag((64, 64), (32, 32))
# put_tag((0,270), (64,64))
s = 80
d = 16
put_tag((self.height - s - d, self.width - s - d), (s, s))
put_tag((d, d), (s, s))
## put_tag((160,160), (32,32))
# print(f"buf: {buf.shape} {buf.dtype}", flush=True)
gb = Gst.Buffer.new_wrapped(buf.tobytes())
duration = int(10**9 / self.fps)
gb.duration = duration
gb.pts = src.counter * duration
src.counter += 1
src.emit("push-buffer", gb)
class GstServer():
def __init__(self, myfile):
self.server = GstRtspServer.RTSPServer()
self.server.set_service("4554")
self.server.set_address("0.0.0.0")
f = MyFactory(myfile)
f.set_shared(True)
m = self.server.get_mount_points()
m.add_factory("/tag", f)
self.server.attach(None)
if __name__ == '__main__':
# myfile = sys.argv[1]
myfile = None
s = GstServer(myfile)
loop.run()
let pkgs = import <nixpkgs> {};
in pkgs.mkShell {
buildInputs = import ./. ++ [ pkgs.openssl ];
}
import asyncio
from concurrent import futures
from threading import current_thread
import aiohttp
import logging
import sys
import numpy as np
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstWebRTC', '1.0')
gi.require_version('GstSdp', '1.0')
from gi.repository import GLib, Gst, GstWebRTC, GstSdp
from janus import JanusSession
logging.basicConfig(level=logging.INFO,
stream=sys.stdout,
format='%(levelname)-8s %(name)s:%(message)s')
# logging.getLogger("libav.h264").setLevel(logging.CRITICAL)
# logging.getLogger("codec.h264").setLevel(logging.CRITICAL)
logging.getLogger("janus").setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)
class WebRTCClient:
def __init__(self):
Gst.init(None)
check_plugins()
self.counter = 0
def start_pipeline(self):
self.pipe = Gst.parse_launch("webrtcbin name=wrb")
self.webrtc = self.pipe
self.pipe.set_state(Gst.State.PLAYING)
self.webrtc.connect('pad-added', self.on_incoming_stream)
def negotiate(self):
self.answer = futures.Future()
# self.webrtc.connect('pad-added', self.on_incoming_stream) # TODO: here?
promise = Gst.Promise.new_with_change_func(self.on_answer_created, self.webrtc, None)
self.webrtc.emit('create-answer', None, promise)
return promise, self.answer
def on_answer_created(self, promise, _, __):
promise.wait()
reply = promise.get_reply()
answer = reply.get_value("answer")
sdp = answer.sdp.as_text()
promise2 = Gst.Promise.new()
self.webrtc.emit('set-local-description', answer, promise2)
self.answer.set_result(sdp)
# def on_incoming_stream(self, _, pad):
def on_incoming_stream(self, *args):
# FIXME check _ vs pad order
logger.info(f"begin on_incoming stream")
pad = args[1]
if pad.direction != Gst.PadDirection.SRC:
return
sink_pipeline = Gst.parse_launch(
"decodebin name=decodesink ! video/x-raw,format=BGR ! appsink name=app emit-signals=true drop=true max-buffers=1")
sink_input = sink_pipeline.get_by_name("decodesink")
sink_output = sink_pipeline.get_by_name("app")
sink_output.connect("new-sample", self.on_new_sample)
self.pipe.add(sink_pipeline)
sink_input.sync_state_with_parent()
sink_input.sync_children_states()
self.webrtc.link(sink_input)
logger.info(f"added stream")
def on_new_sample(self, _, sample):
buf = sample.get_buffer()
caps = buf.get_caps.get_structure(0)
w,h = caps.get_value("width"), caps.get_value("height")
array = np.ndarray(shape=buf.get_size(), buffer=buf.extract_dup(0, buf.get_size()), dtype=np.uint8)
array = array.reshape(h, w, 3)
self.counter += 1
if self.counter % 90 == 0:
logger.info(f"new sample №{self.counter} {array.shape}")
def handle_sdp_offer(self, sdp):
res, sdpmsg = GstSdp.SDPMessage.new()
assert res == GstSdp.SDPResult.OK, f"unexpected res {res}"
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
promise = Gst.Promise.new()
self.webrtc.emit('set-remote-description', offer, promise)
return promise
async def about(self):
try:
ld = self.webrtc.get_property("current-local-description")
rd = self.webrtc.get_property("current-remote-description")
cs = self.webrtc.get_property("connection-state")
ics = self.webrtc.get_property("ice-connection-state")
s = f"About:\n\tlocal description: {ld}\n\tremote description{rd}\n\tconn {cs}\n\tice {ics}"
s = f"{s}\n\tpads: {self.webrtc.pads}"
return s
except:
logger.exception("about() failed")
return "About() failed"
class JanusSetup:
def __init__(self):
http_session = aiohttp.ClientSession()
self.session = JanusSession(http_session, "http://127.0.0.1:8088/janus")
async def setup(self):
await self.session.create()
self.plugin = await self.session.attach("janus.plugin.streaming")
request = {
"body": {
"request": "create",
"type": "rtsp",
"name": "virtual camera",
"description": "",
"video": True,
"url": "rtsp://127.0.0.1:4554/tag"
}
}
resp = await self.plugin.sync_send(request)
self.mountpoint = resp["plugindata"]["data"]["stream"]["id"]
return self.mountpoint
async def get_sdp_offer(self):
response = await self.plugin.async_send(
{"body": {
"request": "watch",
"id": self.mountpoint
}})
assert (response["jsep"]["type"] == "offer")
sdp = response["jsep"]["sdp"]
logger.info(f"received SDP offer in {current_thread()}: {sdp}")
return sdp
def send_sdp_schedule(self, sdp):
asyncio.ensure_future(self.send_sdp(sdp), loop=asyncio.get_event_loop())
async def send_sdp(self, sdp):
logger.info(f"really begin sending sdp offer in {current_thread()}")
response = await self.plugin.async_send({
"body": { "request": "start" },
"jsep": {
"sdp": sdp,
"trickle": False,
"type": "answer"
},
})
logger.info(f"really sent sdp offer in {current_thread()}, got {response}")# asyncio.ensure_future(send_sdp(offer.sdp.as_text()), loop=self.loop)
async def main():
await asyncio.sleep(5) # wait Janus
janus = JanusSetup()
await janus.setup()
sdp = await janus.get_sdp_offer()
wrb = WebRTCClient()
wrb.start_pipeline()
p = wrb.handle_sdp_offer(sdp)
p.wait()
p, f = wrb.negotiate()
p.wait()
answer = f.result()
await janus.send_sdp(answer)
while True:
await asyncio.sleep(30)
logger.info(f"{await wrb.about()}")
def check_plugins():
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
"rtpmanager", "videotestsrc", "audiotestsrc"]
ok = True
for p in needed:
if Gst.Registry.get().find_plugin(p) is None:
ok = False
logger.error(f"MISSING PLUGIN {p}")
logger.info(f"all plugins found? {ok}")
return ok
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
@le-chat
Copy link
Author

le-chat commented Jul 8, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment