Created
February 22, 2023 03:35
-
-
Save reconbot/1f63a6a0aa84a6b898cc3bb79f56dddc to your computer and use it in GitHub Desktop.
this time it doesn't crash - doesn't seem to read from the network though
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{net::{SocketAddr, Ipv4Addr}, sync::Arc}; | |
use tokio::net::{UdpSocket}; //UdpFramed | |
use anyhow::{Context, Result}; | |
// use tokio::time::sleep; | |
// use std::time::Duration; | |
use socket2::{Socket, Domain, Type, Protocol}; | |
use serde::{Deserialize, Serialize}; | |
use std::time::{SystemTime, UNIX_EPOCH}; | |
use tokio::sync::mpsc::{self, Receiver, Sender}; | |
use futures::try_join; | |
// https://stackoverflow.com/a/35697810/339 claims 508 bytes is the only safe size for the internet | |
// we're not going on the internet so it's the MTU of our ethernet or maybe the max ip packet size minus some overhead so maybe 65515 bytes? People there seem to think if IP fragments our packet we're in trouble but I don't think so. Ethernet MTU is 1500 bytes? I'm going with 64kb because it's all the memory I'll ever need. | |
const UDP_MAX_PACKET_SIZE: usize = 64 * 1024; | |
#[derive(Debug)] | |
struct ReceivedMessage { | |
text: String, | |
timestamp: u128, | |
ip: String, | |
} | |
#[derive(Serialize, Deserialize, Debug)] | |
struct SendingMessage { | |
text: String, | |
timestamp: u128, | |
} | |
async fn decoder(mut rx: Receiver<(SocketAddr, Vec<u8>)>, tx: Sender<ReceivedMessage>) -> Result<()>{ | |
tokio::spawn(async move { | |
loop { | |
let (ip, packet) = match rx.recv().await { | |
Some(data) => data, | |
None => { | |
println!("rx closed in decoder"); | |
return; | |
} | |
}; | |
let data = match serde_json::from_slice::<SendingMessage>(&packet) { | |
Ok(data) => data, | |
Err(e) => { | |
println!("Error processing JSON {}", e); | |
continue; | |
} | |
}; | |
let message = ReceivedMessage { | |
text: data.text, | |
timestamp: data.timestamp, | |
ip: ip.ip().to_string(), | |
}; | |
tx.send(message).await.unwrap(); | |
} | |
}).await?; | |
Ok(()) | |
} | |
async fn encoder(mut rx: Receiver<SendingMessage>, tx: Sender<Vec<u8>>) -> Result<()> { | |
tokio::spawn(async move { | |
loop { | |
let message = rx.recv().await.unwrap(); | |
let data = match serde_json::to_string(&message) { | |
Ok(data) => data.as_bytes().to_vec(), | |
Err(e) => { | |
println!("Error encoding JSON {}", e); | |
continue; | |
} | |
}; | |
tx.send(data).await.with_context(||"error sending encoded data").unwrap(); | |
} | |
}); | |
Ok(()) | |
} | |
async fn ux(mut rx: Receiver<(ReceivedMessage)>, tx: Sender<SendingMessage>) -> Result<()> { | |
let input = tokio::task::spawn_blocking(move || { | |
let stdin = std::io::stdin(); | |
loop { | |
let mut text = String::new(); | |
stdin.read_line(&mut text).unwrap(); | |
let now = SystemTime::now(); | |
let timestamp = now.duration_since(UNIX_EPOCH).unwrap().as_millis(); | |
tx.blocking_send(SendingMessage { text, timestamp }).with_context(|| "send failed why?").unwrap(); | |
}; | |
}); | |
let output = tokio::spawn(async move { | |
loop { | |
let message = rx.recv().await.unwrap(); | |
println!("Received Message {}", message.text); | |
}; | |
}); | |
try_join!(input, output)?; | |
Ok(()) | |
} | |
async fn network(mut rx: Receiver<Vec<u8>>, tx: Sender<(SocketAddr, Vec<u8>)>) -> Result<()> { | |
// ok so https://github.com/tokio-rs/tokio/issues/5485 means we got to do this ourselves | |
// I'm not even convinced I'm allowed to do it after bind so yolo if that issue even works for us | |
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; | |
socket.set_nonblocking(true)?; | |
socket.set_reuse_address(true)?; | |
socket.set_reuse_port(true)?; | |
let multicast_addr = "224.0.0.1".parse::<Ipv4Addr>() | |
.with_context(|| "Failed to parse multicast address")?; | |
socket | |
.join_multicast_v4(&multicast_addr, &Ipv4Addr::UNSPECIFIED) | |
.with_context(|| "Failed to join multicast group")?; | |
let listen_addr = "0.0.0.0:31337".parse::<SocketAddr>()?; | |
socket.bind(&listen_addr.into()) | |
.with_context(|| "Failed to bind to listen address")?; | |
// Spawn a task to receive multicast messages | |
let socket = UdpSocket::from_std(socket.into())?; | |
let receiver = Arc::new(socket); | |
let sender = receiver.clone(); | |
// receive messages | |
let input = tokio::spawn(async move { | |
let mut buf = [0u8; UDP_MAX_PACKET_SIZE]; | |
loop { | |
let (size, src) = receiver.recv_from(&mut buf).await.unwrap(); | |
if size > UDP_MAX_PACKET_SIZE { | |
println!("ignoring a packet that's too large"); | |
continue; | |
} | |
let packet = buf[..size].to_vec(); | |
let _ = tx.send((src, packet)); | |
} | |
}); | |
// Send a multicast message | |
let output = tokio::spawn(async move { | |
let multicast_addr = "224.0.0.1:31337".parse::<SocketAddr>().unwrap(); | |
loop { | |
let packet = rx.recv().await.with_context(||"receiving a packet to send").unwrap(); | |
sender.send_to(&packet, &multicast_addr).await.unwrap(); | |
println!("sent! {:?}", std::str::from_utf8(&packet)); | |
} | |
}); | |
try_join!(input, output)?; | |
Ok(()) | |
} | |
#[tokio::main] | |
async fn main() -> Result<()> { | |
let num_procs = std::thread::available_parallelism()?; | |
let (network_in_tx, network_in_rx) = mpsc::channel::<(SocketAddr, Vec<u8>)>(1); | |
let (network_out_tx, network_out_rx) = mpsc::channel::<Vec<u8>>(1); | |
let (ux_out_tx, ux_out_rx) = mpsc::channel::<SendingMessage>(1); | |
let (ux_in_tx, ux_in_rx) = mpsc::channel::<ReceivedMessage>(1); | |
try_join!( | |
network(network_out_rx, network_in_tx), | |
decoder(network_in_rx, ux_in_tx), | |
encoder(ux_out_rx, network_out_tx), | |
ux(ux_in_rx, ux_out_tx.clone()), | |
)?; | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment