-
-
Save mchlrhw/c365bc8acdeac18c1626de3f92cffba5 to your computer and use it in GitHub Desktop.
Rust gstreamer webrtcbin impl
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "avserver" | |
version = "0.1.0" | |
authors = ["mchlrhw <4028654+mchlrhw@users.noreply.github.com>"] | |
edition = "2018" | |
[dependencies] | |
failure = "0.1.5" | |
glib = "0.6.1" | |
gstreamer = "0.12.2" | |
gstreamer-sdp = "0.12.2" | |
gstreamer-webrtc = "0.12.2" | |
lazy_static = "1.2.0" | |
rocket = "0.4.0" | |
rocket_contrib = "0.4.0" | |
serde = "1.0.87" | |
serde_derive = "1.0.87" | |
serde_json = "1.0.38" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(custom_attribute, proc_macro_hygiene, decl_macro)] | |
#[macro_use] extern crate rocket; | |
use std::{ | |
sync::mpsc::{channel, Sender}, | |
thread, | |
}; | |
use failure::{Error, Fail}; | |
use glib; | |
use gstreamer::{self as gst, prelude::*}; | |
use gstreamer_sdp as gst_sdp; | |
use gstreamer_webrtc as gst_webrtc; | |
use lazy_static::lazy_static; | |
use rocket::State; | |
use rocket_contrib::json::Json; | |
use serde_derive::{Deserialize, Serialize}; | |
const STUN_SERVER: &str = "stun://stun.l.google.com:19302"; | |
lazy_static! { | |
static ref RTP_CAPS_VP8: gst::Caps = { | |
gst::Caps::new_simple( | |
"application/x-rtp", | |
&[ | |
("media", &"video"), | |
("encoding-name", &"VP8"), | |
("payload", &(96i32)), | |
], | |
) | |
}; | |
} | |
#[derive(Debug, Fail)] | |
#[fail(display = "Missing elements {:?}", _0)] | |
struct MissingElements(Vec<&'static str>); | |
#[derive(Debug, Fail)] | |
#[fail(display = "Failed to create answer")] | |
struct NullAnswer; | |
#[derive(Debug, Fail)] | |
#[fail(display = "Failed to get bus")] | |
struct NullBus; | |
#[derive(Debug, Fail)] | |
#[fail(display = "Failed to create element \"{}\"", _0)] | |
struct NullElement(&'static str); | |
#[derive(Debug, Fail)] | |
#[fail(display = "Failed to create offer")] | |
struct NullOffer; | |
#[derive(Debug, Fail)] | |
#[fail(display = "Failed to create pad \"{}\"", _0)] | |
struct NullPad(&'static str); | |
#[derive(Debug, Fail)] | |
#[fail(display = "Failed to create reply")] | |
struct NullReply; | |
#[derive(Debug, Fail)] | |
#[fail(display = "Failed to create session description")] | |
struct NullSessionDescription; | |
#[derive(Debug, Deserialize, Serialize)] | |
struct Sdp { | |
#[serde(rename = "type")] | |
type_: String, | |
#[serde(rename = "sdp")] | |
data: String, | |
} | |
struct AppState { | |
pipeline: gst::Pipeline, | |
} | |
fn on_answer_created( | |
pipeline: &gst::Pipeline, | |
peer_id: &str, | |
promise: &gst::Promise, | |
answer_sender: Sender<gst_webrtc::WebRTCSessionDescription>, | |
) | |
-> Result<(), Error> | |
{ | |
let reply = promise.get_reply().ok_or(NullReply)?; | |
let answer = reply | |
.get_value("answer") | |
.ok_or(NullAnswer)? | |
.get::<gst_webrtc::WebRTCSessionDescription>() | |
.ok_or(NullSessionDescription)?; | |
let webrtcbin = pipeline.get_by_name(peer_id).ok_or(NullElement("webrtcbin"))?; | |
webrtcbin.emit("set-local-description", &[&answer, &None::<gst::Promise>])?; | |
answer_sender.send(answer)?; | |
Ok(()) | |
} | |
fn add_peer_to_pipeline(pipeline: &gst::Pipeline, peer_id: &str) -> Result<(), Error> { | |
let queue = gst::ElementFactory::make("queue", None) | |
.ok_or(NullElement("queue"))?; | |
let webrtcbin = gst::ElementFactory::make("webrtcbin", peer_id) | |
.ok_or(NullElement("webrtcbin"))?; | |
pipeline.add_many(&[ | |
&queue, | |
&webrtcbin, | |
])?; | |
let queue_src = queue.get_static_pad("src") | |
.ok_or(NullPad("queue_src"))?; | |
let webrtc_sink = webrtcbin.get_request_pad("sink_%u") | |
.ok_or(NullPad("webrtc_sink"))?; | |
queue_src.link(&webrtc_sink).into_result()?; | |
let tee = pipeline.get_by_name("videotee") | |
.ok_or(NullElement("videotee"))?; | |
let tee_src = tee.get_request_pad("src_%u") | |
.ok_or(NullPad("tee_src"))?; | |
let queue_sink = queue.get_static_pad("sink") | |
.ok_or(NullPad("queue_sink"))?; | |
tee_src.link(&queue_sink).into_result()?; | |
queue.sync_state_with_parent()?; | |
webrtcbin.sync_state_with_parent()?; | |
Ok(()) | |
} | |
#[post("/stream", data="<sdp>")] | |
fn handle_sdp_offer(sdp: Json<Sdp>, app_state: State<AppState>) -> String { | |
// TODO: Proper error handling for this function in place of unwraps | |
if sdp.type_ != "offer" { | |
// TODO: Error here | |
println!(r#"Sdp type is not "offer""#) | |
} | |
let pipeline = &app_state.pipeline; | |
// TODO: Actually make this random (e.g. UUID) | |
let peer_id = "random"; | |
add_peer_to_pipeline(&pipeline, peer_id).unwrap(); | |
let msg = gst_sdp::SDPMessage::parse_buffer(sdp.data.as_bytes()).unwrap(); | |
let offer = gst_webrtc::WebRTCSessionDescription::new( | |
gst_webrtc::WebRTCSDPType::Offer, msg); | |
let promise = gst::Promise::new(); | |
let webrtcbin = pipeline.get_by_name(peer_id).unwrap(); | |
webrtcbin.emit("set-remote-description", &[&offer, &promise]).unwrap(); | |
let (answer_sender, answer_receiver) = channel(); | |
let pipeline_clone = pipeline.clone(); | |
let promise = gst::Promise::new_with_change_func(move |promise| { | |
on_answer_created(&pipeline_clone, peer_id, promise, answer_sender).unwrap(); | |
}); | |
webrtcbin.emit("create-answer", &[&None::<gst::Structure>, &promise]).unwrap(); | |
let answer = answer_receiver.recv().unwrap(); | |
let msg = serde_json::to_string(&Sdp { | |
type_: "answer".to_string(), | |
data: answer.get_sdp().as_text().unwrap(), | |
}).unwrap(); | |
msg | |
} | |
fn check_plugins() -> Result<(), Error> { | |
let needed = [ | |
"opus", | |
"vpx", | |
"nice", | |
"webrtc", | |
"dtls", | |
"srtp", | |
"rtpmanager", | |
"videotestsrc", | |
"audiotestsrc", | |
]; | |
let registry = gst::Registry::get(); | |
let missing = needed | |
.iter() | |
.filter(|n| registry.find_plugin(n).is_none()) | |
.map(|n| *n) | |
.collect::<Vec<_>>(); | |
if !missing.is_empty() { | |
Err(MissingElements(missing))? | |
} else { | |
Ok(()) | |
} | |
} | |
fn init_pipeline(pipeline: &gst::Pipeline) -> Result<(), Error> { | |
let videotestsrc = gst::ElementFactory::make("videotestsrc", None) | |
.ok_or(NullElement("videotestsrc"))?; | |
videotestsrc.set_property_from_str("pattern", "ball"); | |
videotestsrc.set_property("is-live", &true)?; | |
let videoconvert = gst::ElementFactory::make("videoconvert", None) | |
.ok_or(NullElement("videoconvert"))?; | |
let queue = gst::ElementFactory::make("queue", None) | |
.ok_or(NullElement("queue"))?; | |
let vp8enc = gst::ElementFactory::make("vp8enc", None) | |
.ok_or(NullElement("vp8enc"))?; | |
vp8enc.set_property("deadline", &1i64)?; | |
let rtpvp8pay = gst::ElementFactory::make("rtpvp8pay", None) | |
.ok_or(NullElement("rtpvp8pay"))?; | |
let queue2 = gst::ElementFactory::make("queue", None) | |
.ok_or(NullElement("queue2"))?; | |
let tee = gst::ElementFactory::make("tee", "videotee") | |
.ok_or(NullElement("videotee"))?; | |
let queue3 = gst::ElementFactory::make("queue", None) | |
.ok_or(NullElement("queue3"))?; | |
let sink = gst::ElementFactory::make("fakesink", None) | |
.ok_or(NullElement("fakesink"))?; | |
pipeline.add_many(&[ | |
&videotestsrc, | |
&videoconvert, | |
&queue, | |
&vp8enc, | |
&rtpvp8pay, | |
&queue2, | |
&tee, | |
&queue3, | |
&sink, | |
])?; | |
gst::Element::link_many(&[ | |
&videotestsrc, | |
&videoconvert, | |
&queue, | |
&vp8enc, | |
&rtpvp8pay, | |
&queue2, | |
])?; | |
queue2.link_filtered(&tee, &*RTP_CAPS_VP8)?; | |
gst::Element::link_many(&[ | |
&tee, | |
&queue3, | |
&sink, | |
])?; | |
pipeline.set_state(gst::State::Playing).into_result()?; | |
Ok(()) | |
} | |
fn main() -> Result<(), Error> { | |
gst::init()?; | |
check_plugins()?; | |
let main_loop = glib::MainLoop::new(None, false); | |
let pipeline = gst::Pipeline::new("main"); | |
init_pipeline(&pipeline)?; | |
thread::spawn(|| { | |
rocket::ignite() | |
.manage(AppState{ pipeline }) | |
.mount("/", routes![handle_sdp_offer]) | |
.launch(); | |
}); | |
main_loop.run(); | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment