Skip to content

Instantly share code, notes, and snippets.

@klausi klausi/main.rs
Created Feb 24, 2018

Embed
What would you like to do?
Hyper tk-listen multi-threaded echo
extern crate futures;
extern crate hyper;
extern crate num_cpus;
extern crate tk_listen;
extern crate tokio_core;
use futures::Stream;
use hyper::header::{ContentLength, ContentType};
use hyper::server::{service_fn, Http, Response};
use std::time::Duration;
use std::thread;
use tk_listen::ListenExt;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use futures::sync::mpsc;
use tokio_core::net::TcpStream;
use futures::Future;
const TEXT: &'static str = "Hello, World!";
fn main() {
let addr = ([127, 0, 0, 1], 3000).into();
let mut core = Core::new().unwrap();
let handle = core.handle();
let handle2 = core.handle();
let listener = TcpListener::bind(&addr, &handle).unwrap();
let num_threads = num_cpus::get();
let mut channels = Vec::new();
for _ in 0..num_threads {
let (tx, rx) = mpsc::unbounded();
channels.push(tx);
thread::spawn(|| worker(rx));
}
let mut next = 0;
let server = listener
.incoming()
.sleep_on_error(Duration::from_millis(10), &handle2)
.map(move |(sock, _addr)| {
channels[next]
.unbounded_send(sock)
.expect("worker thread died");
next = (next + 1) % channels.len();
Ok(())
})
// Maximum of 10,000 connections simultaneously.
.listen(10_000);
core.run(server).unwrap();
}
// Represents one worker thread of the server that receives TCP connections from
// the main server thread.
fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let http = Http::<hyper::Chunk>::new();
let done = rx.for_each(move |socket| {
let addr = match socket.peer_addr() {
Ok(addr) => addr,
Err(error) => {
println!(
"Failed to get remote address, ignoring connection. Error: {}",
error
);
return Ok(());
}
};
let hello = service_fn(|_req| {
Ok(Response::<hyper::Body>::new()
.with_header(ContentLength(TEXT.len() as u64))
.with_header(ContentType::plaintext())
.with_body(TEXT))
});
let connection = http.serve_connection(socket, hello)
.map(|_| ())
.map_err(move |err| println!("server connection error: ({}) {}", addr, err));
handle.spawn(connection);
Ok(())
});
match core.run(done) {
Ok(_) => println!("Worker tokio core run ended unexpectedly"),
Err(_) => println!("Worker tokio core run error."),
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.