Skip to content

Instantly share code, notes, and snippets.

@klausi klausi/main.rs
Created Feb 18, 2018

Embed
What would you like to do?
Resilient Rust echo server
extern crate futures;
extern crate hyper;
extern crate num_cpus;
extern crate service_fn;
extern crate tokio_core;
use std::io;
use std::net::{self, SocketAddr};
use std::thread;
use std::time;
use futures::Future;
use futures::stream::Stream;
use futures::sync::mpsc;
use hyper::header::{ContentLength, ContentType};
use hyper::server::{Http, Response};
use service_fn::service_fn;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
const TEXT: &'static str = "Hello, World!";
fn main() {
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
let num_threads = num_cpus::get();
let listener = net::TcpListener::bind(&addr).expect("failed to bind");
println!("Listening on: {}", addr);
let mut channels = Vec::new();
for _ in 0..num_threads {
let (tx, rx) = mpsc::unbounded();
channels.push(tx);
thread::spawn(|| worker(rx));
}
let mut next = 0;
for socket in listener.incoming() {
let socket = match socket {
Ok(socket) => socket,
// Ignore connection refused/aborted errors and directly continue
// with the next request.
Err(ref e) if connection_error(e) => continue,
// Ignore socket errors like "Too many open files" on the OS
// level. We need to sleep for a bit because the socket is not
// removed from the accept queue in this case. A direct continue
// would spin the CPU really hard with the same error again and
// again.
Err(_) => {
let ten_millis = time::Duration::from_millis(10);
thread::sleep(ten_millis);
continue;
}
};
channels[next]
.unbounded_send(socket)
.expect("worker thread died");
next = (next + 1) % channels.len();
}
}
// Represents one worker thread of the server that receives TCP connections from
// the main server thread.
fn worker(rx: mpsc::UnboundedReceiver<std::net::TcpStream>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let http = Http::<hyper::Chunk>::new();
let done = rx.for_each(move |socket| {
let socket = match TcpStream::from_stream(socket, &handle) {
Ok(socket) => socket,
Err(error) => {
println!(
"Failed to read TCP stream, ignoring connection. Error: {}",
error
);
return Ok(());
}
};
let addr = match socket.peer_addr() {
Ok(addr) => addr,
Err(error) => {
println!(
"Failed to get remote address, ignoring connection. Error: {}",
error
);
return Ok(());
}
};
let hello = service_fn(|_req| {
Ok(Response::<hyper::Body>::new()
.with_header(ContentLength(TEXT.len() as u64))
.with_header(ContentType::plaintext())
.with_body(TEXT))
});
let connection = http.serve_connection(socket, hello)
.map(|_| ())
.map_err(move |err| println!("server connection error: ({}) {}", addr, err));
handle.spawn(connection);
Ok(())
});
match core.run(done) {
Ok(_) => println!("Worker tokio core run ended unexpectedly"),
Err(_) => println!("Worker tokio core run error."),
};
}
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused || e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.