Created
February 18, 2018 20:58
-
-
Save klausi/f94b9aff7d36a1cb4ebbca746f0a099f to your computer and use it in GitHub Desktop.
Resilient Rust echo server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate futures; | |
extern crate hyper; | |
extern crate num_cpus; | |
extern crate service_fn; | |
extern crate tokio_core; | |
use std::io; | |
use std::net::{self, SocketAddr}; | |
use std::thread; | |
use std::time; | |
use futures::Future; | |
use futures::stream::Stream; | |
use futures::sync::mpsc; | |
use hyper::header::{ContentLength, ContentType}; | |
use hyper::server::{Http, Response}; | |
use service_fn::service_fn; | |
use tokio_core::net::TcpStream; | |
use tokio_core::reactor::Core; | |
const TEXT: &'static str = "Hello, World!"; | |
fn main() { | |
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); | |
let num_threads = num_cpus::get(); | |
let listener = net::TcpListener::bind(&addr).expect("failed to bind"); | |
println!("Listening on: {}", addr); | |
let mut channels = Vec::new(); | |
for _ in 0..num_threads { | |
let (tx, rx) = mpsc::unbounded(); | |
channels.push(tx); | |
thread::spawn(|| worker(rx)); | |
} | |
let mut next = 0; | |
for socket in listener.incoming() { | |
let socket = match socket { | |
Ok(socket) => socket, | |
// Ignore connection refused/aborted errors and directly continue | |
// with the next request. | |
Err(ref e) if connection_error(e) => continue, | |
// Ignore socket errors like "Too many open files" on the OS | |
// level. We need to sleep for a bit because the socket is not | |
// removed from the accept queue in this case. A direct continue | |
// would spin the CPU really hard with the same error again and | |
// again. | |
Err(_) => { | |
let ten_millis = time::Duration::from_millis(10); | |
thread::sleep(ten_millis); | |
continue; | |
} | |
}; | |
channels[next] | |
.unbounded_send(socket) | |
.expect("worker thread died"); | |
next = (next + 1) % channels.len(); | |
} | |
} | |
// Represents one worker thread of the server that receives TCP connections from | |
// the main server thread. | |
fn worker(rx: mpsc::UnboundedReceiver<std::net::TcpStream>) { | |
let mut core = Core::new().unwrap(); | |
let handle = core.handle(); | |
let http = Http::<hyper::Chunk>::new(); | |
let done = rx.for_each(move |socket| { | |
let socket = match TcpStream::from_stream(socket, &handle) { | |
Ok(socket) => socket, | |
Err(error) => { | |
println!( | |
"Failed to read TCP stream, ignoring connection. Error: {}", | |
error | |
); | |
return Ok(()); | |
} | |
}; | |
let addr = match socket.peer_addr() { | |
Ok(addr) => addr, | |
Err(error) => { | |
println!( | |
"Failed to get remote address, ignoring connection. Error: {}", | |
error | |
); | |
return Ok(()); | |
} | |
}; | |
let hello = service_fn(|_req| { | |
Ok(Response::<hyper::Body>::new() | |
.with_header(ContentLength(TEXT.len() as u64)) | |
.with_header(ContentType::plaintext()) | |
.with_body(TEXT)) | |
}); | |
let connection = http.serve_connection(socket, hello) | |
.map(|_| ()) | |
.map_err(move |err| println!("server connection error: ({}) {}", addr, err)); | |
handle.spawn(connection); | |
Ok(()) | |
}); | |
match core.run(done) { | |
Ok(_) => println!("Worker tokio core run ended unexpectedly"), | |
Err(_) => println!("Worker tokio core run error."), | |
}; | |
} | |
/// This function defines errors that are per-connection. Which basically | |
/// means that if we get this error from `accept()` system call it means | |
/// next connection might be ready to be accepted. | |
/// | |
/// All other errors will incur a timeout before next `accept()` is performed. | |
/// The timeout is useful to handle resource exhaustion errors like ENFILE | |
/// and EMFILE. Otherwise, could enter into tight loop. | |
fn connection_error(e: &io::Error) -> bool { | |
e.kind() == io::ErrorKind::ConnectionRefused || e.kind() == io::ErrorKind::ConnectionAborted | |
|| e.kind() == io::ErrorKind::ConnectionReset | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment