Last active
September 16, 2020 14:43
-
-
Save Stackout/c273f3e12e3a629e1ba324c0c09ba33a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![warn(rust_2018_idioms)] | |
#[allow(arithmetic_overflow)] | |
use tokio::net::TcpListener; | |
use tokio_util::codec::{BytesCodec, Framed}; | |
use std::env; | |
use bytes::{BufMut, BytesMut}; | |
use std::sync::{Arc,RwLock}; | |
use std::collections::HashMap; | |
use uuid::Uuid; | |
use tokio::net::TcpStream; | |
use futures::prelude::*; | |
use futures::{Sink}; | |
mod types; | |
pub mod message; | |
pub use types::Entity; | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
// Allow passing an address to listen on as the first argument of this | |
// program, but otherwise we'll just set up our TCP listener on | |
// 127.0.0.1:8080 for connections. | |
let addr = env::args() | |
.nth(1) | |
.unwrap_or_else(|| "0.0.0.0:3333".to_string()); | |
let connection_hashmap: HashMap<u8, Pin<Box<dyn Sink<BytesCodec>>>> = HashMap::new(); | |
let entity_hashmap: HashMap<u8,Entity> = HashMap::new(); | |
let initial_counter: u8 = 0; | |
// Hashmap to store a sink value with an id key | |
let connections = Arc::new(RwLock::new(connection_hashmap)); | |
let entities = Arc::new(RwLock::new(entity_hashmap)); | |
let counter = Arc::new(RwLock::new(initial_counter)); | |
// Next up we create a TCP listener which will listen for incoming | |
// connections. This TCP listener is bound to the address we determined | |
// above and must be associated with an event loop, so we pass in a handle | |
// to our event loop. After the socket's created we inform that we're ready | |
// to go and start accepting connections. | |
let mut listener = TcpListener::bind(&addr).await?; | |
println!("Listening on: {}", addr); | |
// Clone references to these states in order to move into the loop. | |
let connections_inner = connections.clone(); | |
let entities_inner = entities.clone(); | |
let counter_inner = counter.clone(); | |
loop { | |
let connections = connections_inner.clone(); | |
let entities = entities_inner.clone(); | |
let counter = counter_inner.clone(); | |
// Asynchronously wait for an inbound socket. | |
let (socket, _) = listener.accept().await?; | |
// And this is where much of the magic of this server happens. We | |
// crucially want all clients to make progress concurrently, rather than | |
// blocking one on completion of another. To achieve this we use the | |
// `tokio::spawn` function to execute the work in the background. | |
// | |
// Essentially here we're executing a new task to run concurrently, | |
// which will allow all of our clients to be processed concurrently. | |
tokio::spawn(async move { | |
let connections = connections.clone(); | |
let ( sink, mut stream ) = Framed::new(socket, BytesCodec::new()).split(); | |
// Iterate to next id | |
let id = *counter_inner.read().unwrap(); | |
// connections.write().unwrap().insert(id, sink); | |
while let Some(message) = stream.next().await { | |
// Auto increment the id counter. | |
let id = *counter.read().unwrap(); | |
// connections.write().unwrap().insert(id, socket); | |
let connections = connections.clone(); | |
match message { | |
Ok(bytes) => message::process(bytes, connections), | |
Err(err) => println!("Socket closed with error: {:?}", err), | |
} | |
} | |
// We loop while there are messages coming from the Stream `framed`. | |
// The stream will return None once the client disconnects. | |
println!("Socket received FIN packet and closed connection"); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment