Skip to content

Instantly share code, notes, and snippets.

@Sherlock-Holo
Created March 9, 2020 10:22
Show Gist options
  • Save Sherlock-Holo/a9e084310535581682cb0186604d6bbe to your computer and use it in GitHub Desktop.
Save Sherlock-Holo/a9e084310535581682cb0186604d6bbe to your computer and use it in GitHub Desktop.
async-std hyper listener and connector
use std::future::Future;
use std::io;
use std::io::ErrorKind;
use std::io::Result;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use async_std::io::{Read, Write};
use async_std::net::{TcpListener, TcpStream};
use async_std::stream::Stream;
use async_std::task;
use async_tls::{TlsAcceptor, TlsConnector};
use async_tls::client::TlsStream as ClientTlsStream;
use async_tls::server::TlsStream as ServerTlsStream;
use futures::future::ready;
use hyper::client::connect::{Connected, Connection};
use hyper::service::Service;
use rustls::ClientConfig;
use tonic::transport::Uri;
#[derive(Clone)]
pub struct HyperExecutor;
impl<F> hyper::rt::Executor<F> for HyperExecutor
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
task::spawn(fut);
}
}
pub struct HyperListener {
tls_acceptor: TlsAcceptor,
tcp_listener: TcpListener,
}
impl hyper::server::accept::Accept for HyperListener {
type Conn = HyperStream<ServerTlsStream<TcpStream>>;
type Error = io::Error;
fn poll_accept(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Conn>>> {
let stream = task::ready!(Pin::new(&mut self.tcp_listener.incoming()).poll_next(cx)).unwrap()?;
let stream = task::ready!(Pin::new(&mut self.tls_acceptor.accept(stream)).poll(cx));
match stream {
Err(err) => Poll::Ready(Some(Err(err))),
Ok(stream) => Poll::Ready(Some(Ok(HyperStream(stream))))
}
}
}
pub struct HyperStream<T>(pub T);
impl<T> tokio::io::AsyncRead for HyperStream<T>
where T: Read + Unpin
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
impl<T> tokio::io::AsyncWrite for HyperStream<T>
where T: Write + Unpin
{
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.0).poll_close(cx)
}
}
impl Connection for HyperStream<ClientTlsStream<TcpStream>> {
fn connected(&self) -> Connected {
Connected::new()
}
}
/*pub struct HyperServerStream(pub ServerTlsStream<TcpStream>);
impl tokio::io::AsyncRead for HyperServerStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
impl tokio::io::AsyncWrite for HyperServerStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_close(cx)
}
}
pub struct HyperClientStream(pub ClientTlsStream<TcpStream>);
impl tokio::io::AsyncRead for HyperClientStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_read(cx, buf)
}
}
impl tokio::io::AsyncWrite for HyperClientStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.0).poll_close(cx)
}
}
impl Connection for HyperClientStream {
fn connected(&self) -> Connected {
let connected = Connected::new();
if let Ok(remote_addr) = self.0.get_ref().peer_addr() {
connected.extra(remote_addr)
} else {
connected
}
}
}*/
#[derive(Clone)]
pub struct HyperConnector {
tls_connector: TlsConnector,
}
impl Service<Uri> for HyperConnector {
type Response = HyperStream<ClientTlsStream<TcpStream>>;
type Error = std::io::Error;
type Future = Pin<Box<dyn Future<Output=io::Result<Self::Response>>>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Uri) -> Self::Future {
match req.authority() {
None => Box::pin(ready(Err(io::Error::new(ErrorKind::AddrNotAvailable, format!("{} is invalid", req)).into()))),
Some(authority) => {
let host = authority.host().to_string();
let authority = authority.to_string();
let tls_connector = self.tls_connector.clone();
Box::pin(async move {
let stream = TcpStream::connect(authority).await?;
let tls_stream = tls_connector.connect(host, stream).await?;
Ok(HyperStream(tls_stream))
})
}
}
}
}
impl From<ClientConfig> for HyperConnector {
fn from(cfg: ClientConfig) -> Self {
Self {
tls_connector: TlsConnector::from(Arc::new(cfg))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment