|
use std::sync::{Arc, Mutex, Weak}; |
|
|
|
// use std::collections::HashMap; |
|
use serde_json::json; |
|
|
|
use gst::element_error; |
|
use gst::prelude::*; |
|
|
|
use serde_derive::{Deserialize, Serialize}; |
|
|
|
|
|
// janus setup |
|
// fn mk_janus_session() -> Result<String, reqwest::Error> { |
|
fn mk_janus_session<>() -> Option<u64> { |
|
let req = json!({ |
|
"janus": "create", |
|
"transaction": "012345678901" |
|
}); |
|
let client = reqwest::blocking::Client::new(); |
|
let resp = client.post("http://127.0.0.1:8088/janus").json(&req).send().ok()?; |
|
let j: serde_json::Value = resp.json().ok()?; |
|
println!("json: {:#?}", j); |
|
let s = j["data"]["id"].as_u64()?; |
|
println!("janus session: {s}"); |
|
return Some(s); |
|
} |
|
|
|
fn attach_janus_plugin(session_id: u64) -> Option<u64> { |
|
let req = json!({ |
|
"janus": "attach", |
|
"plugin": "janus.plugin.streaming", |
|
"transaction": "012345678902" |
|
}); |
|
let client = reqwest::blocking::Client::new(); |
|
let session_url = "http://127.0.0.1:8088/janus/".to_owned() + &session_id.to_string(); |
|
let resp: serde_json::Value = client.post(session_url) |
|
.json(&req) |
|
.send().ok()? |
|
.json().ok()?; |
|
println!("janus plugin: {resp}"); |
|
let plugin_id = resp["data"]["id"].as_u64()?; |
|
println!("janus plugin id: {plugin_id}"); |
|
return Some(plugin_id); |
|
} |
|
|
|
fn camera(session_id: u64, plugin_id: u64) -> Option<u64> { |
|
let request = json!({ |
|
"janus": "message", |
|
"transaction": "012345678903", |
|
"body": { |
|
"request": "create", |
|
"type": "rtsp", |
|
"name": "virtual camera", |
|
"description": "", |
|
"video": true, |
|
"url": "rtsp://127.0.0.1:4554/tag" |
|
} |
|
}); |
|
let client = reqwest::blocking::Client::new(); |
|
let url = "http://127.0.0.1:8088/janus/".to_owned() + &session_id.to_string() + "/" + &plugin_id.to_string(); |
|
let response: serde_json::Value = client.post(url) |
|
.json(&request) |
|
.send().ok()? |
|
.json().ok()?; |
|
println!("camera response: {response}"); |
|
let mountpoint = response["plugindata"]["data"]["stream"]["id"].as_u64()?; |
|
println!("mountpoint: {mountpoint}"); |
|
return Some(mountpoint); |
|
} |
|
|
|
// fn setup() -> Option<()> { |
|
// let session_id = mk_janus_session()?; |
|
// let plugin_id = attach_janus_plugin(session_id.clone())?; |
|
// let mountpoint = camera(session_id.clone(), plugin_id.clone())?; |
|
// let sdp = get_sdp_offer(session_id, plugin_id, mountpoint)?; |
|
// println!("SDP: {sdp}"); |
|
// Some(()) |
|
// } |
|
|
|
fn get_sdp_offer(session_id: u64, plugin_id: u64, mountpoint: u64) -> Option<String> { |
|
let request = json!({ |
|
"janus": "message", |
|
"transaction": "012345678904", |
|
"body": { |
|
"request": "watch", |
|
"id": mountpoint |
|
} |
|
}); |
|
let client = reqwest::blocking::Client::new(); |
|
let session_url = "http://127.0.0.1:8088/janus/".to_owned() + &session_id.to_string(); |
|
let url = session_url.clone() + "/" + &plugin_id.to_string(); |
|
let response: serde_json::Value = client.post(url) |
|
.json(&request) |
|
.send().ok()? |
|
.json().ok()?; |
|
println!("watch response: {response}"); |
|
// poll: |
|
let poll_url = session_url.clone() + "?maxev=1&rid=1"; |
|
println!("DEBUG poll_url {poll_url}"); |
|
let answer: serde_json::Value = client.get(poll_url).send().ok()?.json().ok()?; |
|
println!("polled: {answer}"); |
|
if answer["janus"] == "event" { |
|
if answer["sender"] == plugin_id { |
|
return Some(answer["jsep"]["sdp"].as_str()?.to_string()); |
|
} |
|
} |
|
return None; |
|
} |
|
|
|
fn ask_janus(session_id: u64) -> Option<()> { |
|
let client = reqwest::blocking::Client::new(); |
|
let session_url = "http://127.0.0.1:8088/janus/".to_owned() + &session_id.to_string(); |
|
let poll_url = session_url + "?maxev=1&rid=2"; |
|
println!("polling Janus... {poll_url}"); |
|
let sent = client.get(poll_url).timeout(std::time::Duration::from_secs(40)).send(); |
|
let j = sent.ok()?.json(); |
|
let answer: serde_json::Value = j.ok()?; |
|
println!("received from Janus: {:#?}", answer); |
|
Some(()) |
|
} |
|
|
|
fn send_sdp_answer(session_id: u64, plugin_id: u64, sdp: String) -> Option<()> { |
|
let request = json!({ |
|
"janus": "message", |
|
"transaction": "012345678905", |
|
"body": { "request": "start" }, |
|
"jsep": { |
|
"sdp": sdp, |
|
"trickle": false, |
|
"type": "answer" |
|
} |
|
}); |
|
println!("send_sdp_answer request: {request}"); |
|
let client = reqwest::blocking::Client::new(); |
|
let session_url = "http://127.0.0.1:8088/janus/".to_owned() + &session_id.to_string(); |
|
let url = session_url.clone() + "/" + &plugin_id.to_string(); |
|
let response: serde_json::Value = client.post(url) |
|
.json(&request) |
|
.send().ok()? |
|
.json().ok()?; |
|
println!("send sdp response: {response}"); |
|
Some(()) |
|
} |
|
|
|
// gstreamer part |
|
|
|
// start pipeline |
|
fn start_pipeline(session_id: u64, plugin_id: u64, sdp: String) -> Option<Arc<gst::Element>> { |
|
let p = gst::parse_launch("webrtcbin name=wrb").ok()?; |
|
println!("pipeline parsed"); |
|
p.connect_pad_added(move |_webrtc, pad| { |
|
// on incoming stream |
|
println!("begin on_incoming_stream"); |
|
}); |
|
p.set_state(gst::State::Playing).expect("Couldn't set pipeline to Playing"); |
|
// handle offer (set-remote-description) |
|
let pt = p.type_(); |
|
let sdpmsg = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()).ok()?; |
|
let offer = gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Offer, sdpmsg); |
|
println!("will use offer {:#?}", offer); |
|
let promise = gst::Promise::new(); |
|
let rd = glib::subclass::signal::SignalId::lookup("set-remote-description", pt)?; |
|
p.emit::<()>(rd, &[&offer, &promise]); |
|
promise.wait(); |
|
// negotiate (create-answer) |
|
let pr = Arc::new(p); |
|
let pr2 = pr.clone(); |
|
let promise2 = gst::Promise::with_change_func(move |reply| { |
|
let res = reply.ok().expect("reply"); |
|
let res2 = res.expect("reply2"); |
|
let answer: gst_webrtc::WebRTCSessionDescription = res2 |
|
.value("answer") |
|
.expect("Invalid argument") |
|
.get().expect("bad answer"); |
|
println!("will use answer {:#?}", answer); |
|
// let answer_string = answer.type_(); |
|
// println!("answer {answer_string} -- {res2}"); |
|
let ld = glib::subclass::signal::SignalId::lookup("set-local-description", pt).expect("Sig"); |
|
let promise3 = gst::Promise::new(); |
|
pr2.emit::<()>(ld, &[&answer, &promise3]); |
|
promise3.wait(); |
|
// send sdp |
|
let sdp = answer.sdp().as_text().unwrap(); |
|
send_sdp_answer(session_id, plugin_id, sdp); |
|
}); |
|
let ca = glib::subclass::signal::SignalId::lookup("create-answer", pt)?; |
|
pr.emit::<()>(ca, &[&None::<gst::Structure>, &promise2]); |
|
promise2.wait(); |
|
println!("pipeline is started"); |
|
Some(pr) |
|
} |
|
|
|
fn about(wrb: &gst::Element) { |
|
// let ld = wrb.property::<gst_webrtc::WebRTCSessionDescription>("current-local-description"); |
|
let ld = wrb.property::<glib::Value>("current-local-description"); |
|
println!("current-local-description: {:#?}", ld); |
|
let rd = wrb.property::<glib::Value>("current-remote-description"); |
|
println!("current-remote-description: {:#?}", rd); |
|
} |
|
|
|
fn play() -> Option<()> { |
|
gst::init().ok()?; |
|
let session_id = mk_janus_session()?; |
|
let plugin_id = attach_janus_plugin(session_id)?; |
|
let mountpoint = camera(session_id, plugin_id)?; |
|
let sdp = get_sdp_offer(session_id, plugin_id, mountpoint)?; |
|
println!("SDP: {sdp}"); |
|
let wrb = start_pipeline(session_id, plugin_id, sdp)?; |
|
loop { |
|
ask_janus(session_id)?; |
|
about(&wrb); |
|
} |
|
println!("we have played"); |
|
Some(()) |
|
} |
|
|
|
fn main() { |
|
play().expect("~.~"); |
|
println!("end") |
|
} |
Logs: https://pastebin.com/raw/RsyKBAfM