Skip to content

Instantly share code, notes, and snippets.

@stonegao
Forked from LeoDog896/Cargo.toml
Created May 16, 2023 11:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stonegao/7223d5121cc2effb41027466a5ffa660 to your computer and use it in GitHub Desktop.
Save stonegao/7223d5121cc2effb41027466a5ffa660 to your computer and use it in GitHub Desktop.
Small WebRTC-rs example
[package]
name = "rs-example"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
webrtc = "0.4.0"
tokio = { version = "1.15.0", features = ["full"] }
clap = "3.0.8"
anyhow = "1.0.52"
serde_json = "1.0.75"
dialoguer = "0.10.1"
use anyhow::Result;
use clap::{AppSettings, Arg, Command};
use dialoguer::{theme::ColorfulTheme, Input};
use std::sync::Arc;
use tokio::time::Duration;
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::MediaEngine;
use webrtc::api::APIBuilder;
use webrtc::data_channel::data_channel_message::DataChannelMessage;
use webrtc::data_channel::RTCDataChannel;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::configuration::RTCConfiguration;
use webrtc::peer_connection::math_rand_alpha;
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
fn main() {
let mut app = Command::new("holey")
.version("0.1.0")
.author("LeoDog896 <leodog896@gmail.com>")
.about("P2P TCP transport over the internet")
.setting(AppSettings::DeriveDisplayOrder)
.setting(AppSettings::SubcommandsNegateReqs)
.arg(
// this arg takes either two values: client or server
Arg::new("type")
.required_unless_present("FULLHELP")
.possible_values(&["client", "server"])
.takes_value(true)
.index(1)
.help("Type of client or server"),
)
.arg(
Arg::new("FULLHELP")
.help("Prints more detailed help information")
.long("fullhelp"),
);
let matches = app.clone().get_matches();
if matches.is_present("FULLHELP") {
app.print_long_help().unwrap();
std::process::exit(0);
}
let pattern = matches.value_of("type").unwrap();
if pattern == "client" {
run_client().expect("Client failed");
} else if pattern == "server" {
run_server().expect("Server failed");
} else {
panic!("unknown type: {}", pattern);
}
}
#[tokio::main]
pub async fn run_client() -> Result<()> {
// Prepare the configuration
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};
// Create a MediaEngine object to configure the supported codec
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let mut registry = Registry::new();
// Use the default set of Interceptors
registry = register_default_interceptors(registry, &mut m)?;
// Create the API object with the MediaEngine
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();
// Create a new RTCPeerConnection
let peer_connection = Arc::new(api.new_peer_connection(config).await?);
// Create a datachannel with label 'data'
let data_channel = peer_connection.create_data_channel("data", None).await?;
let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection
.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
println!("Peer Connection State has changed: {}", s);
if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
println!("Peer Connection has gone to failed exiting");
let _ = done_tx.try_send(());
}
Box::pin(async {})
}))
.await;
// Register channel opening handling
let d1 = Arc::clone(&data_channel);
data_channel.on_open(Box::new(move || {
println!("Data channel '{}'-'{}' open. Random messages will now be sent to any connected DataChannels every 5 seconds", d1.label(), d1.id());
let d2 = Arc::clone(&d1);
Box::pin(async move {
let mut result = Result::<usize>::Ok(0);
while result.is_ok() {
let timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
tokio::select! {
_ = timeout.as_mut() =>{
let message = math_rand_alpha(15);
println!("Sending '{}'", message);
result = d2.send_text(message).await.map_err(Into::into);
}
};
}
})
})).await;
// Register text message handling
let d_label = data_channel.label().to_owned();
data_channel
.on_message(Box::new(move |msg: DataChannelMessage| {
let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
println!("Message from DataChannel '{}': '{}'", d_label, msg_str);
Box::pin(async {})
}))
.await;
// Create an offer to send to the other process
let offer = peer_connection.create_offer(None).await?;
// Create channel that is blocked until ICE Gathering is complete
let mut gather_complete = peer_connection.gathering_complete_promise().await;
// Sets the LocalDescription, and starts our UDP listeners
// Note: this will start the gathering of ICE candidates
peer_connection.set_local_description(offer).await?;
let _ = gather_complete.recv().await;
// Send our offer to the HTTP server listening in the other process
let payload = serde_json::to_string(
&peer_connection
.local_description()
.await
.expect("Failed to get local description"),
)?;
println!("payload: {}", payload);
let server_payload: String = Input::with_theme(&ColorfulTheme::default())
.with_prompt("Enter server payload")
.interact_text()
.unwrap();
let sdp = serde_json::from_str::<RTCSessionDescription>(&server_payload)?;
peer_connection.set_remote_description(sdp).await?;
println!("Press ctrl-c to stop");
tokio::select! {
_ = done_rx.recv() => {
println!("received done signal!");
}
_ = tokio::signal::ctrl_c() => {
println!("");
}
};
peer_connection.close().await?;
Ok(())
}
#[tokio::main]
pub async fn run_server() -> Result<()> {
// Prepare the configuration
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()],
..Default::default()
}],
..Default::default()
};
// Create a MediaEngine object to configure the supported codec
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let mut registry = Registry::new();
// Use the default set of Interceptors
registry = register_default_interceptors(registry, &mut m)?;
// Create the API object with the MediaEngine
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();
// Create a new RTCPeerConnection
let peer_connection = Arc::new(api.new_peer_connection(config).await?);
let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection
.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
println!("Peer Connection State has changed: {}", s);
if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
println!("Peer Connection has gone to failed exiting");
let _ = done_tx.try_send(());
}
Box::pin(async {})
}))
.await;
// Register data channel creation handling
peer_connection.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
let d_label = d.label().to_owned();
let d_id = d.id();
println!("New DataChannel {} {}", d_label, d_id);
Box::pin(async move{
// Register channel opening handling
let d2 = Arc::clone(&d);
let d_label2 = d_label.clone();
let d_id2 = d_id;
d.on_open(Box::new(move || {
println!("Data channel '{}'-'{}' open. Random messages will now be sent to any connected DataChannels every 5 seconds", d_label2, d_id2);
Box::pin(async move {
let mut result = Result::<usize>::Ok(0);
while result.is_ok() {
let timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
tokio::select! {
_ = timeout.as_mut() =>{
let message = math_rand_alpha(15);
println!("Sending '{}'", message);
result = d2.send_text(message).await.map_err(Into::into);
}
};
}
})
})).await;
// Register text message handling
d.on_message(Box::new(move |msg: DataChannelMessage| {
let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
println!("Message from DataChannel '{}': '{}'", d_label, msg_str);
Box::pin(async{})
})).await;
})
})).await;
println!("Press ctrl-c to stop");
let client_payload: String = Input::with_theme(&ColorfulTheme::default())
.with_prompt("Enter client payload")
.interact_text()
.unwrap();
let sdp = serde_json::from_str::<RTCSessionDescription>(&client_payload)?;
peer_connection.set_remote_description(sdp).await?;
// Create an answer to send to the other process
let answer = peer_connection.create_answer(None).await?;
// Create channel that is blocked until ICE Gathering is complete
let mut gather_complete = peer_connection.gathering_complete_promise().await;
// Sets the LocalDescription, and starts our UDP listeners
peer_connection.set_local_description(answer).await?;
let _ = gather_complete.recv().await;
// Send our answer to the HTTP server listening in the other process
let payload = serde_json::to_string(
&peer_connection
.local_description()
.await
.expect("Failed to get local description"),
)?;
println!("{}", payload);
tokio::select! {
_ = done_rx.recv() => {
println!("received done signal!");
}
_ = tokio::signal::ctrl_c() => {
println!("");
}
};
peer_connection.close().await?;
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment