Skip to content

Instantly share code, notes, and snippets.

@klausi
Created February 24, 2018 13:31
Show Gist options
  • Save klausi/93b57d3abe2c5bc4975b0b9921f2a3c2 to your computer and use it in GitHub Desktop.
Save klausi/93b57d3abe2c5bc4975b0b9921f2a3c2 to your computer and use it in GitHub Desktop.
Hyper tk-listen multi-threaded echo
extern crate futures;
extern crate hyper;
extern crate num_cpus;
extern crate tk_listen;
extern crate tokio_core;
use futures::Stream;
use hyper::header::{ContentLength, ContentType};
use hyper::server::{service_fn, Http, Response};
use std::time::Duration;
use std::thread;
use tk_listen::ListenExt;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use futures::sync::mpsc;
use tokio_core::net::TcpStream;
use futures::Future;
const TEXT: &'static str = "Hello, World!";
fn main() {
let addr = ([127, 0, 0, 1], 3000).into();
let mut core = Core::new().unwrap();
let handle = core.handle();
let handle2 = core.handle();
let listener = TcpListener::bind(&addr, &handle).unwrap();
let num_threads = num_cpus::get();
let mut channels = Vec::new();
for _ in 0..num_threads {
let (tx, rx) = mpsc::unbounded();
channels.push(tx);
thread::spawn(|| worker(rx));
}
let mut next = 0;
let server = listener
.incoming()
.sleep_on_error(Duration::from_millis(10), &handle2)
.map(move |(sock, _addr)| {
channels[next]
.unbounded_send(sock)
.expect("worker thread died");
next = (next + 1) % channels.len();
Ok(())
})
// Maximum of 10,000 connections simultaneously.
.listen(10_000);
core.run(server).unwrap();
}
// Represents one worker thread of the server that receives TCP connections from
// the main server thread.
fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let http = Http::<hyper::Chunk>::new();
let done = rx.for_each(move |socket| {
let addr = match socket.peer_addr() {
Ok(addr) => addr,
Err(error) => {
println!(
"Failed to get remote address, ignoring connection. Error: {}",
error
);
return Ok(());
}
};
let hello = service_fn(|_req| {
Ok(Response::<hyper::Body>::new()
.with_header(ContentLength(TEXT.len() as u64))
.with_header(ContentType::plaintext())
.with_body(TEXT))
});
let connection = http.serve_connection(socket, hello)
.map(|_| ())
.map_err(move |err| println!("server connection error: ({}) {}", addr, err));
handle.spawn(connection);
Ok(())
});
match core.run(done) {
Ok(_) => println!("Worker tokio core run ended unexpectedly"),
Err(_) => println!("Worker tokio core run error."),
};
}
@dertin
Copy link

dertin commented Jan 23, 2023

Hi @klausi

I am currently using this example:
https://github.com/hyperium/hyper/blob/0.14.x/examples/service_struct_impl.rs
because I have to send a HttpClienttonic::transport::Channel to the hyper.rs request handler.

let grpc_client = HttpClient::new(channel_tonic);

let addr = SocketAddr::from(([0, 0, 0, 0], httpserver_port));

let server = Server::bind(&addr)
    .http1_preserve_header_case(true)
    .http1_title_case_headers(true)
    .tcp_sleep_on_accept_errors(true)
    .serve(MakeSvc { grpc_client });

Can I use multiple CPUs on my http server to handle parallel requests on the same port?
Do you have an updated example? I don't have much experience with Rust yet. Thanks!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment