Created
March 26, 2024 09:14
-
-
Save haydnv/9ed45169963a61c8a18aca341b3b3044 to your computer and use it in GitHub Desktop.
Hyper client-server test demonstrating deadlock bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::error::Error; | |
use std::net::{Ipv4Addr, SocketAddr}; | |
use futures::TryStreamExt; | |
use http_body_util::{combinators::BoxBody, BodyExt, BodyStream, Empty, Full}; | |
use hyper::body::{Bytes, Incoming}; | |
use hyper::server::conn::http1; | |
use hyper::service::service_fn; | |
use hyper::{body::Body, Method, Request, Response}; | |
use hyper_util::rt::TokioIo; | |
use tokio::net::{TcpListener, TcpStream}; | |
use tokio::task::JoinHandle; | |
// localhost fails | |
const IP_ADDR: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1); | |
// LAN address also fails | |
// const IP_ADDR: Ipv4Addr = Ipv4Addr::new(192, 168, 0, 1); | |
const PORT: u16 = 8000; | |
type TokioError = Box<dyn Error + Send + Sync>; | |
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> { | |
Full::new(chunk.into()) | |
.map_err(|never| match never {}) | |
.boxed() | |
} | |
async fn handle(req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> { | |
println!("request handler running..."); | |
assert_eq!(req.method(), Method::GET); | |
assert_eq!(req.uri().path(), "/"); | |
Ok(Response::new(full("Hello, World!"))) | |
} | |
async fn spawn_server(ip_addr: Ipv4Addr, port: u16) -> Result<JoinHandle<Result<(), TokioError>>, TokioError> { | |
let addr = SocketAddr::from((ip_addr, port)); | |
let listener = TcpListener::bind(addr).await?; | |
let handle = tokio::task::spawn(async move { | |
println!("server listening for TCP connection..."); | |
let (stream, _) = listener.accept().await?; | |
println!("server accepted TCP connection"); | |
let io = TokioIo::new(stream); | |
let service = service_fn(move |request| handle(request)); | |
tokio::task::spawn(async move { | |
println!("server handling request..."); | |
if let Err(err) = http1::Builder::new().serve_connection(io, service).await { | |
println!("HTTP connection error: {:?}", err); | |
} | |
}); | |
Ok(()) | |
}); | |
Ok(handle) | |
} | |
async fn connect_and_send<B>( | |
stream: TcpStream, | |
request: hyper::http::request::Builder, | |
body: B, | |
) -> Result<Response<Incoming>, TokioError> | |
where | |
B: Body<Data = Bytes> + Send + Unpin + 'static, | |
B::Error: Error + Send + Sync + 'static, | |
{ | |
let io = TokioIo::new(stream); | |
println!("client performing TCP handshake..."); | |
// TCP handshake | |
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; | |
println!("client handshake succeeded"); | |
// check that the connection succeeded | |
conn.await?; | |
println!("client connection succeeded"); | |
let request = request.body(body)?; | |
println!("client sending request data..."); | |
let response = sender.send_request(request).await?; | |
Ok(response) | |
} | |
async fn fetch(url: String) -> Result<String, TokioError> { | |
let url: hyper::Uri = url.parse()?; | |
let host = url.host().expect("host"); | |
let port = url.port_u16().unwrap_or(80); | |
let addr = format!("{}:{}", host, port); | |
let request = hyper::Request::builder() | |
.method(Method::GET) | |
.header(hyper::header::HOST, host) | |
.uri(url); | |
println!("client opening TCP connection to {addr}..."); | |
let stream = TcpStream::connect(addr.clone()).await?; | |
println!("client sending outbound HTTP request to {addr}..."); | |
let body = Empty::<Bytes>::new(); | |
let response = connect_and_send(stream, request, body).await?; | |
println!("client reading HTTP response from {addr}..."); | |
let body = BodyStream::new(response.into_body()) | |
.map_ok(|frame| frame.into_data().expect("frame")) | |
.try_collect::<Vec<Bytes>>() | |
.await?; | |
let body = String::from_utf8(body.into_iter().flatten().collect())?; | |
Ok(body) | |
} | |
#[tokio::test] | |
async fn test_hyper() -> Result<(), TokioError> { | |
let server = spawn_server(IP_ADDR, PORT).await?; | |
println!("server listening..."); | |
let response = fetch(format!("http://{IP_ADDR}:{PORT}/")).await?; | |
println!("response OK: {response}"); | |
server.await? | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment