Skip to content

Instantly share code, notes, and snippets.

@jacobzlogar
Last active November 23, 2022 19:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jacobzlogar/861748d23e812c15dc5f9524595c0ca4 to your computer and use it in GitHub Desktop.
Save jacobzlogar/861748d23e812c15dc5f9524595c0ca4 to your computer and use it in GitHub Desktop.
proto2
use bytes::BytesMut;
use is_prime::*;
use serde::{Deserialize, Serialize};
use serde_json::*;
use std::collections::HashMap;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest};
use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
type ConnectionTx = mpsc::UnboundedSender<ConnectionMessage>;
type ConnectionRx = mpsc::UnboundedReceiver<ConnectionMessage>;
type ServerRx = mpsc::UnboundedReceiver<ServerMessage>;
type ServerTx = mpsc::UnboundedSender<ServerMessage>;
type ClientRx = mpsc::UnboundedReceiver<String>;
type ClientTx = mpsc::UnboundedSender<String>;
struct ServerMessage {
payload: Payload,
addr: SocketAddr,
}
#[derive(Serialize, Deserialize, Debug)]
struct Payload {
method: String,
number: Number,
}
#[derive(Serialize, Deserialize, Debug)]
struct ValidationResponse {
status: String,
}
#[derive(Serialize, Deserialize, Debug)]
struct Response {
method: String,
prime: bool,
}
#[derive(Debug, PartialEq)]
enum ConnectionStatus {
Join,
Quit,
}
#[derive(Debug)]
struct Connection {
addr: SocketAddr,
client_rx: ClientRx,
server_tx: ServerTx,
}
#[derive(Debug)]
struct ConnectionMessage {
addr: SocketAddr,
client_tx: Option<ClientTx>,
status: ConnectionStatus,
}
#[derive(Debug)]
struct Server {
connection_rx: ConnectionRx,
server_rx: ServerRx,
connections: HashMap<SocketAddr, ClientTx>,
}
impl Connection {
async fn listen(mut self, mut stream: TcpStream) {
let (mut recv_half, mut wr_half) = stream.split();
loop {
let mut buf = BytesMut::with_capacity(4096);
tokio::select! {
read = recv_half.read_buf(&mut buf) => {
if read.unwrap() == 0 {
break;
}
let json_string = String::from_utf8_lossy(&mut buf);
let mut json = Deserializer::from_str(&json_string);
let value = Payload::deserialize(&mut json);
match &value {
Err(..) => {
let bye = "get lost \r\n".to_string();
wr_half.write(&bye.as_bytes()).await;
panic!("lol");
},
Ok(..) => {
let msg = ServerMessage {
payload: value.unwrap(),
addr: self.addr.clone(),
};
self.server_tx.send(msg);
}
}
}
write = self.client_rx.recv() => {
dbg!(&write);
wr_half.write(write.unwrap().as_bytes()).await;
}
}
}
}
}
impl Server {
async fn process(mut self) {
loop {
tokio::select! {
conn_msg = self.connection_rx.recv() => {
let connection = conn_msg.unwrap();
self.connections.insert(connection.addr, connection.client_tx.unwrap());
dbg!(&self.connections);
},
client_msg = self.server_rx.recv() => {
let msg = client_msg.unwrap();
let mut number = msg.payload.number;
if number.is_f64() {
let float = number.as_f64().unwrap();
let int = (float / 100_000.0) as i64 * 100_000;
let convert = serde_json::Number::from(int);
number = convert;
}
let num = number.as_i64().unwrap().abs().to_string();
let res = Response {
method: "isPrime".to_string(),
prime: is_prime(&num),
};
let connection = self.connections.get(&msg.addr).unwrap();
connection.send(format!("{} \r\n", serde_json::to_string(&res).unwrap()));
}
};
}
}
}
#[tokio::main]
async fn main() -> io::Result<()> {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 6969);
let listener = TcpListener::bind(&addr).await.unwrap();
let (connection_tx, mut connection_rx) = mpsc::unbounded_channel::<ConnectionMessage>();
let (server_tx, mut server_rx) = mpsc::unbounded_channel::<ServerMessage>();
let server_tx = server_tx.clone();
let connection_tx = connection_tx.clone();
// server handle
let server = Server {
connection_rx,
server_rx,
connections: HashMap::new(),
};
let server_handle = tokio::spawn(async move {
server.process().await;
});
// accept handle
tokio::spawn(async move {
loop {
let (stream, addr) = listener.accept().await.unwrap();
let (client_tx, client_rx) = mpsc::unbounded_channel();
let connection = Connection {
addr,
client_rx,
server_tx: server_tx.clone(),
};
let msg = ConnectionMessage {
addr,
client_tx: Some(client_tx.clone()),
status: ConnectionStatus::Join,
};
connection_tx.send(msg);
tokio::spawn(async move {
connection.listen(stream).await;
});
}
});
server_handle.await?;
Ok(())
}
[package]
name = "proto2"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
is_prime = "2.0.9"
dotenv = "0.15.0"
tokio = { version = "1", features = ["macros", "net", "full", "rt"] }
tokio-util = { version = "0.7.0", features = ["full"] }
tokio-stream = { version = "0.1" }
bytes = "1"
tracing = "0.1"
tracing-subscriber = { version = "0.3.1", default-features = false, features = ["fmt", "ansi", "env-filter", "tracing-log"] }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment