Skip to content

Instantly share code, notes, and snippets.

@pubnoconst
Last active April 14, 2023 08:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pubnoconst/cfaa5425bae9f135d5b7f413a9f36a30 to your computer and use it in GitHub Desktop.
Save pubnoconst/cfaa5425bae9f135d5b7f413a9f36a30 to your computer and use it in GitHub Desktop.
Send and receive STUN packets to compute p2p latency
[package]
name = "rust_playground"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rand = { version = "0.8.5", features = ["std_rng"] }
[dependencies.tokio]
version = "1.27.0"
features = ["full"]
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread::sleep,
time::{Duration, Instant},
};
use tokio::{
net::UdpSocket,
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot::{channel, Receiver, Sender},
},
};
use rand::{rngs::StdRng, RngCore, SeedableRng};
trait StunPacketTrait<const N: usize> {
fn transaction_id(&self) -> &[u8];
}
impl<const N: usize> StunPacketTrait<N> for [u8; N] {
fn transaction_id(&self) -> &[u8] {
&self[8..][..12]
}
}
struct StunPacket {
packet: [u8; 20],
}
impl StunPacket {
fn new() -> StunPacket {
let mut packet: [u8; 20] = [0; 20];
// binding request
packet[0] = 0x00;
packet[1] = 0x01;
// attribute
packet[2] = 0x00;
packet[3] = 0x00;
// magic cookie
packet[4..8].copy_from_slice(&[0x21, 0x12, 0xA4, 0x42]);
// transaction id
let mut rng = StdRng::from_entropy();
rng.fill_bytes(&mut packet[8..]);
StunPacket { packet }
}
fn transaction_id(&self) -> &[u8] {
self.packet.transaction_id()
}
}
struct Peer {
remote_socket: String,
cooldown: Duration,
stop_rx: Receiver<()>,
result_tx: UnboundedSender<Duration>,
}
impl Peer {
fn new(
remote_socket: String,
cooldown: Option<Duration>,
) -> (Self, Sender<()>, UnboundedReceiver<Duration>) {
let (stop_tx, stop_rx) = channel::<()>();
let (result_tx, result_rx) = unbounded_channel::<Duration>();
let peer = Peer {
remote_socket,
cooldown: cooldown.unwrap_or(Duration::from_millis(400)),
stop_rx,
result_tx,
};
(peer, stop_tx, result_rx)
}
async fn monitor(&mut self) {
let run = Arc::new(AtomicBool::new(true));
if self.stop_rx.try_recv().is_ok() {
run.store(false, Ordering::SeqCst);
}
let send_times = Arc::new(Mutex::new(HashMap::new()));
let local_socket = UdpSocket::bind("0.0.0.0:0")
.await
.expect("Error binding to local socket");
let sender_jh = async {
while run.load(Ordering::SeqCst) {
// println!("Sending STUN to {}", self.remote_socket);
let stun = StunPacket::new();
send_times
.lock()
.expect("Unable to lock mutex for in send loop")
.insert(stun.transaction_id().to_owned(), Instant::now());
local_socket
.send_to(&stun.packet, &self.remote_socket)
.await
.expect("Error sending stun packet");
tokio::time::sleep(self.cooldown).await; // Cooldown
}
};
let listener_jh = async {
while run.load(Ordering::SeqCst) {
// println!("Listening for STUN from {}", self.remote_socket);
let mut buf = [0; 128];
match local_socket.recv_from(&mut buf).await {
Ok((len, _)) if len >= 20 => {
if let Some(send_time) = send_times.lock().unwrap().remove(buf.transaction_id())
{
// println!("Packet recognized!");
let stun_rtt = send_time.elapsed();
self.result_tx.send(stun_rtt).expect("Error sending rtt");
} else {
// println!("Unrecognized transaction ID");
}
}
Ok(_) => {
// println!("Packet ignored");
}
Err(_e) => {
// println!("Data not received: {}", _e);
}
}
tokio::time::sleep(self.cooldown).await; // Cooldown
}
};
tokio::join!(sender_jh, listener_jh);
}
}
#[tokio::main]
async fn main() {
let remote_socket = "stun.cope.es:3478".to_string();
let (mut peer, stop_tx, mut duration_rx) = Peer::new(remote_socket.to_owned(), None);
tokio::task::spawn(async move {
peer.monitor().await;
});
tokio::task::spawn(async move {
while let Some(latency) = duration_rx.recv().await {
println!("Current latency with {} is: {:.2?}", remote_socket, latency);
}
});
sleep(Duration::from_millis(6000)); // timeout
stop_tx.send(()).unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment