Skip to content

Instantly share code, notes, and snippets.

@vi
Created March 24, 2019 22:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vi/2f1452b4270bccf1cc9d85b96721c33c to your computer and use it in GitHub Desktop.
Save vi/2f1452b4270bccf1cc9d85b96721c33c to your computer and use it in GitHub Desktop.
Dump of hacky experimentation with Hyper. Not suggesting to write code like this
// Trying to imiate `rust-websocket`-like interface with hyper 0.12
#![allow(unused)]
#![deny(unused_must_use)]
use tokio::prelude::*;
use tokio::net::tcp::{TcpListener,TcpStream};
use std::net::SocketAddr;
use hyper::server::conn;
use hyper::service::Service;
use hyper::Body;
use hyper::{Request,Response};
use hyper::upgrade::Upgraded;
use hyper::service::service_fn;
use std::error::{Error as StdError};
use std::io::{Error as IoError,ErrorKind as IoErrorKind};
use tokio::sync::oneshot::{Receiver,Sender,channel};
use std::cell::RefCell;
use std::sync::Arc;
use std::sync::Mutex;
struct Wrapper<S:AsyncRead+AsyncWrite+Send+'static>{
rx: Option<Receiver<S>>,
rx2: Receiver<()>,
tx3: Option<Sender<()>>,
}
impl<S:AsyncRead+AsyncWrite+Send+'static> Wrapper<S> {
pub fn new(c: S) -> Self {
let (tx, rx) = channel();
let (tx2, rx2) = channel();
let (tx3, rx3) = channel();
let tx2 = Arc::new(Mutex::new(Some(tx2)));
let rx3 = Arc::new(Mutex::new(Some(rx3)));
let mut h = conn::Http::new();
h.http1_only(true);
h.keep_alive(false);
let q = h.serve_connection(c, service_fn(move |_rq|{
let tx2 = tx2.clone().lock().unwrap().take().unwrap();
let rx3 = rx3.clone().lock().unwrap().take().unwrap();
tx2.send(()).unwrap_or_else(|_|eprintln!("failed to send 2"));
rx3.map_err(|e|{Box::<dyn StdError+Send+Sync>::from(format!("{}",e))}).and_then(|()| {
let mut r = Response::new(Body::empty());
*r.status_mut() = hyper::StatusCode::SWITCHING_PROTOCOLS;
future::ok::<_,Box<dyn StdError+Send+Sync>>(r)
})
}))
.without_shutdown()
.and_then(|p: hyper::server::conn::Parts<_,_>| {
tx.send(p.io).unwrap_or_else(|_|eprintln!("failed to send"));
future::ok(())
})
.map_err(|e|eprintln!("{}",e))
;
tokio::executor::spawn(q);
Self{
rx: Some(rx),
rx2,
tx3: Some(tx3),
}
}
}
impl<S:AsyncRead+AsyncWrite+Send+'static> Future for Wrapper<S> {
type Item = ReceivedRequest<S>;
type Error = ();
fn poll(&mut self) -> Poll<ReceivedRequest<S>,()> {
match self.rx2.poll() {
Err(e) => {eprintln!("Poll err 1"); Err(())},
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(())) => Ok(Async::Ready(ReceivedRequest {
rx: self.rx.take().unwrap(),
tx3: self.tx3.take().unwrap(),
})),
}
}
}
struct ReceivedRequest<S:AsyncRead+AsyncWrite+Send+'static> {
rx: Receiver<S>,
tx3: Sender<()>,
}
impl<S:AsyncRead+AsyncWrite+Send+'static> ReceivedRequest<S> {
pub fn accept(self) -> impl Future<Item=EchoServer<S>, Error=()> {
self.tx3.send(()).unwrap_or_else(|_|eprintln!("failed to send 3"));
self.rx.map_err(|e|eprintln!("failed to recv")).and_then(|io:S|{
let es = EchoServer::new(io);
future::ok(es)
})
}
}
struct EchoServer<S:AsyncRead+AsyncWrite+Send+'static> {
q : tokio::io::Copy<tokio::io::ReadHalf<S>,tokio::io::WriteHalf<S>>,
}
impl<S:AsyncRead+AsyncWrite+Send+'static> EchoServer<S> {
pub fn new(io: S) -> Self {
let (r,w) = io.split();
Self {
q: tokio::io::copy(r,w)
}
}
}
impl<S:AsyncRead+AsyncWrite+Send+'static> Future for EchoServer<S> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(),()> {
match self.q.poll() {
Err(e) => {eprintln!("{}", e); Err(())},
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => Ok(Async::Ready(())),
}
}
}
fn main() -> Result<(),Box<dyn StdError>> {
let sa : SocketAddr = "127.0.0.1:1234".parse()?;
let l = TcpListener::bind(&sa)?;
let p = l.incoming()
.map_err(|e|eprintln!("{}", e))
.for_each(|c:TcpStream| {
Wrapper::new(c).and_then(|rr:ReceivedRequest<_>| {
rr.accept().and_then(|e|{
e
})
})
});
tokio::runtime::run(p);
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment