Created
February 24, 2018 13:31
-
-
Save klausi/93b57d3abe2c5bc4975b0b9921f2a3c2 to your computer and use it in GitHub Desktop.
Hyper tk-listen multi-threaded echo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate futures; | |
extern crate hyper; | |
extern crate num_cpus; | |
extern crate tk_listen; | |
extern crate tokio_core; | |
use futures::Stream; | |
use hyper::header::{ContentLength, ContentType}; | |
use hyper::server::{service_fn, Http, Response}; | |
use std::time::Duration; | |
use std::thread; | |
use tk_listen::ListenExt; | |
use tokio_core::net::TcpListener; | |
use tokio_core::reactor::Core; | |
use futures::sync::mpsc; | |
use tokio_core::net::TcpStream; | |
use futures::Future; | |
const TEXT: &'static str = "Hello, World!"; | |
fn main() { | |
let addr = ([127, 0, 0, 1], 3000).into(); | |
let mut core = Core::new().unwrap(); | |
let handle = core.handle(); | |
let handle2 = core.handle(); | |
let listener = TcpListener::bind(&addr, &handle).unwrap(); | |
let num_threads = num_cpus::get(); | |
let mut channels = Vec::new(); | |
for _ in 0..num_threads { | |
let (tx, rx) = mpsc::unbounded(); | |
channels.push(tx); | |
thread::spawn(|| worker(rx)); | |
} | |
let mut next = 0; | |
let server = listener | |
.incoming() | |
.sleep_on_error(Duration::from_millis(10), &handle2) | |
.map(move |(sock, _addr)| { | |
channels[next] | |
.unbounded_send(sock) | |
.expect("worker thread died"); | |
next = (next + 1) % channels.len(); | |
Ok(()) | |
}) | |
// Maximum of 10,000 connections simultaneously. | |
.listen(10_000); | |
core.run(server).unwrap(); | |
} | |
// Represents one worker thread of the server that receives TCP connections from | |
// the main server thread. | |
fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) { | |
let mut core = Core::new().unwrap(); | |
let handle = core.handle(); | |
let http = Http::<hyper::Chunk>::new(); | |
let done = rx.for_each(move |socket| { | |
let addr = match socket.peer_addr() { | |
Ok(addr) => addr, | |
Err(error) => { | |
println!( | |
"Failed to get remote address, ignoring connection. Error: {}", | |
error | |
); | |
return Ok(()); | |
} | |
}; | |
let hello = service_fn(|_req| { | |
Ok(Response::<hyper::Body>::new() | |
.with_header(ContentLength(TEXT.len() as u64)) | |
.with_header(ContentType::plaintext()) | |
.with_body(TEXT)) | |
}); | |
let connection = http.serve_connection(socket, hello) | |
.map(|_| ()) | |
.map_err(move |err| println!("server connection error: ({}) {}", addr, err)); | |
handle.spawn(connection); | |
Ok(()) | |
}); | |
match core.run(done) { | |
Ok(_) => println!("Worker tokio core run ended unexpectedly"), | |
Err(_) => println!("Worker tokio core run error."), | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @klausi
I am currently using this example:
https://github.com/hyperium/hyper/blob/0.14.x/examples/service_struct_impl.rs
because I have to send a HttpClienttonic::transport::Channel to the hyper.rs request handler.
Can I use multiple CPUs on my http server to handle parallel requests on the same port?
Do you have an updated example? I don't have much experience with Rust yet. Thanks!!