Last active
February 22, 2023 03:08
-
-
Save reconbot/02e5962cc21fd79d8513e4f075635242 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "hackerchat-rust" | |
version = "0.1.0" | |
edition = "2021" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
anyhow = "1.0.69" | |
async-channel = "1.8.0" | |
serde = { version = "1.0.152", features = ["std", "derive"] } | |
serde_json = "1.0.93" | |
socket2 = "0.4.7" | |
tokio = { version = "1.25.0", features = ["full"]} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{net::{SocketAddr, Ipv4Addr}, sync::Arc}; | |
use tokio::net::{UdpSocket}; //UdpFramed | |
use anyhow::{Context, Result}; | |
// use tokio::time::sleep; | |
// use std::time::Duration; | |
use socket2::{Socket, Domain, Type, Protocol}; | |
use serde::{Deserialize, Serialize}; | |
use std::time::{SystemTime, UNIX_EPOCH}; | |
use tokio::sync::mpsc::{self, Receiver, Sender}; | |
// https://stackoverflow.com/a/35697810/339 claims 508 bytes is the only safe size for the internet | |
// we're not going on the internet so it's the MTU of our ethernet or maybe the max ip packet size minus some overhead so maybe 65515 bytes? People there seem to think if IP fragments our packet we're in trouble but I don't think so. Ethernet MTU is 1500 bytes? I'm going with 64kb because it's all the memory I'll ever need. | |
const UDP_MAX_PACKET_SIZE: usize = 64 * 1024; | |
#[derive(Debug)] | |
struct ReceivedMessage { | |
text: String, | |
timestamp: u128, | |
ip: String, | |
} | |
#[derive(Serialize, Deserialize, Debug)] | |
struct SendingMessage { | |
text: String, | |
timestamp: u128, | |
} | |
fn decoder(mut rx: Receiver<(SocketAddr, Vec<u8>)>, tx: Sender<ReceivedMessage>){ | |
tokio::spawn(async move { | |
loop { | |
let (ip, packet) = match rx.recv().await { | |
Some(data) => data, | |
None => { | |
println!("rx closed in decoder"); | |
return; | |
} | |
}; | |
let data = match serde_json::from_slice::<SendingMessage>(&packet) { | |
Ok(data) => data, | |
Err(e) => { | |
println!("Error processing JSON {}", e); | |
continue; | |
} | |
}; | |
let message = ReceivedMessage { | |
text: data.text, | |
timestamp: data.timestamp, | |
ip: ip.ip().to_string(), | |
}; | |
tx.send(message).await.unwrap(); | |
} | |
}); | |
} | |
fn encoder(mut rx: Receiver<SendingMessage>, tx: Sender<Vec<u8>>){ | |
tokio::spawn(async move { | |
loop { | |
let message = rx.recv().await.unwrap(); | |
let data = match serde_json::to_string(&message) { | |
Ok(data) => data.as_bytes().to_vec(), | |
Err(e) => { | |
println!("Error encoding JSON {}", e); | |
continue; | |
} | |
}; | |
tx.send(data).await.with_context(||"error sending encoded data").unwrap(); | |
} | |
}); | |
} | |
fn ux(mut rx: Receiver<(ReceivedMessage)>, tx: Sender<SendingMessage>) { | |
tokio::task::spawn_blocking(move || { | |
let stdin = std::io::stdin(); | |
loop { | |
let mut text = String::new(); | |
stdin.read_line(&mut text).unwrap(); | |
let now = SystemTime::now(); | |
let timestamp = now.duration_since(UNIX_EPOCH).unwrap().as_millis(); | |
tx.blocking_send(SendingMessage { text, timestamp }).with_context(|| "send failed why?").unwrap(); | |
}; | |
}); | |
tokio::spawn(async move { | |
loop { | |
let message = rx.recv().await.unwrap(); | |
println!("Received Message {}", message.text); | |
}; | |
}); | |
} | |
fn network_actor(mut rx: Receiver<Vec<u8>>, tx: Sender<(SocketAddr, Vec<u8>)>) -> Result<()> { | |
// ok so https://github.com/tokio-rs/tokio/issues/5485 means we got to do this ourselves | |
// I'm not even convinced I'm allowed to do it after bind so yolo if that issue even works for us | |
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?; | |
socket.set_nonblocking(true)?; | |
socket.set_reuse_address(true)?; | |
socket.set_reuse_port(true)?; | |
let multicast_addr = "224.0.0.1".parse::<Ipv4Addr>() | |
.with_context(|| "Failed to parse multicast address")?; | |
socket | |
.join_multicast_v4(&multicast_addr, &Ipv4Addr::UNSPECIFIED) | |
.with_context(|| "Failed to join multicast group")?; | |
let listen_addr = "0.0.0.0:31337".parse::<SocketAddr>()?; | |
socket.bind(&listen_addr.into()) | |
.with_context(|| "Failed to bind to listen address")?; | |
// Spawn a task to receive multicast messages | |
let socket = UdpSocket::from_std(socket.into())?; | |
let receiver = Arc::new(socket); | |
let sender = receiver.clone(); | |
// receive messages | |
tokio::spawn(async move { | |
let mut buf = [0u8; UDP_MAX_PACKET_SIZE]; | |
loop { | |
let (size, src) = receiver.recv_from(&mut buf).await.unwrap(); | |
if size > UDP_MAX_PACKET_SIZE { | |
println!("ignoring a packet that's too large"); | |
continue; | |
} | |
let packet = buf[..size].to_vec(); | |
let _ = tx.send((src, packet)); | |
} | |
}); | |
// Send a multicast message | |
tokio::spawn(async move { | |
let multicast_addr = "224.0.0.1:31337".parse::<SocketAddr>().unwrap(); | |
loop { | |
let packet = rx.recv().await.with_context(||"receiving a packet to send").unwrap(); | |
println!("sending! {:?}", std::str::from_utf8(&packet)); | |
sender.send_to(&packet, &multicast_addr).await.unwrap(); | |
} | |
}); | |
Ok(()) | |
} | |
#[tokio::main] | |
async fn main() -> Result<()> { | |
let num_procs = std::thread::available_parallelism()?; | |
let (network_in_tx, network_in_rx) = mpsc::channel::<(SocketAddr, Vec<u8>)>(1); | |
let (network_out_tx, network_out_rx) = mpsc::channel::<Vec<u8>>(1); | |
let (ux_out_tx, ux_out_rx) = mpsc::channel::<SendingMessage>(1); | |
let (ux_in_tx, ux_in_rx) = mpsc::channel::<ReceivedMessage>(1); | |
network_actor(network_out_rx, network_in_tx)?; | |
decoder(network_in_rx, ux_in_tx); | |
encoder(ux_out_rx, network_out_tx); | |
ux(ux_in_rx, ux_out_tx.clone()); | |
ux_out_tx.send(SendingMessage { text: "hey you".to_string(), timestamp: 4 }).await?; | |
println!("sent the thing"); | |
Ok(()) | |
} |
sent the thing
sending! Ok("{\"text\":\"hey you\",\"timestamp\":4}")
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: receiving a packet to send', src/main.rs:134:87
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Why does this panic with it claiming the channels are closed?