Skip to content

Instantly share code, notes, and snippets.

@mchlrhw
Last active October 23, 2020 02:47
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mchlrhw/c365bc8acdeac18c1626de3f92cffba5 to your computer and use it in GitHub Desktop.
Save mchlrhw/c365bc8acdeac18c1626de3f92cffba5 to your computer and use it in GitHub Desktop.
Rust gstreamer webrtcbin impl
[package]
name = "avserver"
version = "0.1.0"
authors = ["mchlrhw <4028654+mchlrhw@users.noreply.github.com>"]
edition = "2018"
[dependencies]
failure = "0.1.5"
glib = "0.6.1"
gstreamer = "0.12.2"
gstreamer-sdp = "0.12.2"
gstreamer-webrtc = "0.12.2"
lazy_static = "1.2.0"
rocket = "0.4.0"
rocket_contrib = "0.4.0"
serde = "1.0.87"
serde_derive = "1.0.87"
serde_json = "1.0.38"
#![feature(custom_attribute, proc_macro_hygiene, decl_macro)]
#[macro_use] extern crate rocket;
use std::{
sync::mpsc::{channel, Sender},
thread,
};
use failure::{Error, Fail};
use glib;
use gstreamer::{self as gst, prelude::*};
use gstreamer_sdp as gst_sdp;
use gstreamer_webrtc as gst_webrtc;
use lazy_static::lazy_static;
use rocket::State;
use rocket_contrib::json::Json;
use serde_derive::{Deserialize, Serialize};
const STUN_SERVER: &str = "stun://stun.l.google.com:19302";
lazy_static! {
static ref RTP_CAPS_VP8: gst::Caps = {
gst::Caps::new_simple(
"application/x-rtp",
&[
("media", &"video"),
("encoding-name", &"VP8"),
("payload", &(96i32)),
],
)
};
}
#[derive(Debug, Fail)]
#[fail(display = "Missing elements {:?}", _0)]
struct MissingElements(Vec<&'static str>);
#[derive(Debug, Fail)]
#[fail(display = "Failed to create answer")]
struct NullAnswer;
#[derive(Debug, Fail)]
#[fail(display = "Failed to get bus")]
struct NullBus;
#[derive(Debug, Fail)]
#[fail(display = "Failed to create element \"{}\"", _0)]
struct NullElement(&'static str);
#[derive(Debug, Fail)]
#[fail(display = "Failed to create offer")]
struct NullOffer;
#[derive(Debug, Fail)]
#[fail(display = "Failed to create pad \"{}\"", _0)]
struct NullPad(&'static str);
#[derive(Debug, Fail)]
#[fail(display = "Failed to create reply")]
struct NullReply;
#[derive(Debug, Fail)]
#[fail(display = "Failed to create session description")]
struct NullSessionDescription;
#[derive(Debug, Deserialize, Serialize)]
struct Sdp {
#[serde(rename = "type")]
type_: String,
#[serde(rename = "sdp")]
data: String,
}
struct AppState {
pipeline: gst::Pipeline,
}
fn on_answer_created(
pipeline: &gst::Pipeline,
peer_id: &str,
promise: &gst::Promise,
answer_sender: Sender<gst_webrtc::WebRTCSessionDescription>,
)
-> Result<(), Error>
{
let reply = promise.get_reply().ok_or(NullReply)?;
let answer = reply
.get_value("answer")
.ok_or(NullAnswer)?
.get::<gst_webrtc::WebRTCSessionDescription>()
.ok_or(NullSessionDescription)?;
let webrtcbin = pipeline.get_by_name(peer_id).ok_or(NullElement("webrtcbin"))?;
webrtcbin.emit("set-local-description", &[&answer, &None::<gst::Promise>])?;
answer_sender.send(answer)?;
Ok(())
}
fn add_peer_to_pipeline(pipeline: &gst::Pipeline, peer_id: &str) -> Result<(), Error> {
let queue = gst::ElementFactory::make("queue", None)
.ok_or(NullElement("queue"))?;
let webrtcbin = gst::ElementFactory::make("webrtcbin", peer_id)
.ok_or(NullElement("webrtcbin"))?;
pipeline.add_many(&[
&queue,
&webrtcbin,
])?;
let queue_src = queue.get_static_pad("src")
.ok_or(NullPad("queue_src"))?;
let webrtc_sink = webrtcbin.get_request_pad("sink_%u")
.ok_or(NullPad("webrtc_sink"))?;
queue_src.link(&webrtc_sink).into_result()?;
let tee = pipeline.get_by_name("videotee")
.ok_or(NullElement("videotee"))?;
let tee_src = tee.get_request_pad("src_%u")
.ok_or(NullPad("tee_src"))?;
let queue_sink = queue.get_static_pad("sink")
.ok_or(NullPad("queue_sink"))?;
tee_src.link(&queue_sink).into_result()?;
queue.sync_state_with_parent()?;
webrtcbin.sync_state_with_parent()?;
Ok(())
}
#[post("/stream", data="<sdp>")]
fn handle_sdp_offer(sdp: Json<Sdp>, app_state: State<AppState>) -> String {
// TODO: Proper error handling for this function in place of unwraps
if sdp.type_ != "offer" {
// TODO: Error here
println!(r#"Sdp type is not "offer""#)
}
let pipeline = &app_state.pipeline;
// TODO: Actually make this random (e.g. UUID)
let peer_id = "random";
add_peer_to_pipeline(&pipeline, peer_id).unwrap();
let msg = gst_sdp::SDPMessage::parse_buffer(sdp.data.as_bytes()).unwrap();
let offer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer, msg);
let promise = gst::Promise::new();
let webrtcbin = pipeline.get_by_name(peer_id).unwrap();
webrtcbin.emit("set-remote-description", &[&offer, &promise]).unwrap();
let (answer_sender, answer_receiver) = channel();
let pipeline_clone = pipeline.clone();
let promise = gst::Promise::new_with_change_func(move |promise| {
on_answer_created(&pipeline_clone, peer_id, promise, answer_sender).unwrap();
});
webrtcbin.emit("create-answer", &[&None::<gst::Structure>, &promise]).unwrap();
let answer = answer_receiver.recv().unwrap();
let msg = serde_json::to_string(&Sdp {
type_: "answer".to_string(),
data: answer.get_sdp().as_text().unwrap(),
}).unwrap();
msg
}
fn check_plugins() -> Result<(), Error> {
let needed = [
"opus",
"vpx",
"nice",
"webrtc",
"dtls",
"srtp",
"rtpmanager",
"videotestsrc",
"audiotestsrc",
];
let registry = gst::Registry::get();
let missing = needed
.iter()
.filter(|n| registry.find_plugin(n).is_none())
.map(|n| *n)
.collect::<Vec<_>>();
if !missing.is_empty() {
Err(MissingElements(missing))?
} else {
Ok(())
}
}
fn init_pipeline(pipeline: &gst::Pipeline) -> Result<(), Error> {
let videotestsrc = gst::ElementFactory::make("videotestsrc", None)
.ok_or(NullElement("videotestsrc"))?;
videotestsrc.set_property_from_str("pattern", "ball");
videotestsrc.set_property("is-live", &true)?;
let videoconvert = gst::ElementFactory::make("videoconvert", None)
.ok_or(NullElement("videoconvert"))?;
let queue = gst::ElementFactory::make("queue", None)
.ok_or(NullElement("queue"))?;
let vp8enc = gst::ElementFactory::make("vp8enc", None)
.ok_or(NullElement("vp8enc"))?;
vp8enc.set_property("deadline", &1i64)?;
let rtpvp8pay = gst::ElementFactory::make("rtpvp8pay", None)
.ok_or(NullElement("rtpvp8pay"))?;
let queue2 = gst::ElementFactory::make("queue", None)
.ok_or(NullElement("queue2"))?;
let tee = gst::ElementFactory::make("tee", "videotee")
.ok_or(NullElement("videotee"))?;
let queue3 = gst::ElementFactory::make("queue", None)
.ok_or(NullElement("queue3"))?;
let sink = gst::ElementFactory::make("fakesink", None)
.ok_or(NullElement("fakesink"))?;
pipeline.add_many(&[
&videotestsrc,
&videoconvert,
&queue,
&vp8enc,
&rtpvp8pay,
&queue2,
&tee,
&queue3,
&sink,
])?;
gst::Element::link_many(&[
&videotestsrc,
&videoconvert,
&queue,
&vp8enc,
&rtpvp8pay,
&queue2,
])?;
queue2.link_filtered(&tee, &*RTP_CAPS_VP8)?;
gst::Element::link_many(&[
&tee,
&queue3,
&sink,
])?;
pipeline.set_state(gst::State::Playing).into_result()?;
Ok(())
}
fn main() -> Result<(), Error> {
gst::init()?;
check_plugins()?;
let main_loop = glib::MainLoop::new(None, false);
let pipeline = gst::Pipeline::new("main");
init_pipeline(&pipeline)?;
thread::spawn(|| {
rocket::ignite()
.manage(AppState{ pipeline })
.mount("/", routes![handle_sdp_offer])
.launch();
});
main_loop.run();
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment