Skip to content

Instantly share code, notes, and snippets.

@ddprrt

ddprrt/main.rs Secret

Last active November 19, 2024 14:09
Show Gist options
  • Save ddprrt/193cd9d50503cb4e5c3af8088d311192 to your computer and use it in GitHub Desktop.
Save ddprrt/193cd9d50503cb4e5c3af8088d311192 to your computer and use it in GitHub Desktop.
Server/Worker separation with Tokio
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::TcpListener,
sync::{mpsc, oneshot},
task,
};
#[derive(Debug)]
enum CommandMessage {
Increment,
Decrement,
}
struct Command {
msg: CommandMessage,
tx: oneshot::Sender<u64>,
}
#[tokio::main]
async fn main() {
let (server_tx, mut worker_rx) = mpsc::channel::<Command>(100);
let listener = TcpListener::bind("localhost:8001").await.unwrap();
// The "worker"
task::spawn(async move {
let mut count = 0;
while let Some(incoming) = worker_rx.recv().await {
match incoming.msg {
CommandMessage::Increment => count += 1,
CommandMessage::Decrement => count -= 1,
}
incoming.tx.send(count).unwrap();
}
});
// The "server"
loop {
let (socket, _addr) = listener.accept().await.unwrap();
let server_tx = server_tx.clone();
task::spawn(async move {
println!("Connnected");
let (reader, mut writer) = socket.into_split();
let mut buf = String::new();
let mut reader = BufReader::new(reader);
while let Ok(_bytes_read) = reader.read_line(&mut buf).await {
let msg = match buf.trim() {
"quit" => break,
"increment" => CommandMessage::Increment,
"decrement" => CommandMessage::Decrement,
_ => {
buf.clear();
continue;
}
};
let (tx, rx) = oneshot::channel();
let cmd = Command { msg, tx };
server_tx.send(cmd).await.unwrap();
let res = rx.await.unwrap();
writer
.write_all(format!("Count: {}\n", res).as_bytes())
.await
.unwrap();
buf.clear();
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment