Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
extern crate async_bincode;
#[macro_use]
extern crate futures;
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate bincode;
extern crate tokio;
extern crate tower_service;
#[derive(Serialize, Deserialize)]
pub struct Request;
#[derive(Serialize, Deserialize)]
pub struct Response;
use futures::sync::{mpsc, oneshot};
use std::io;
use tokio::prelude::*;
struct ConnectionInner {
reqs: mpsc::UnboundedReceiver<(Request, oneshot::Sender<Response>)>,
stream: async_bincode::AsyncBincodeStream<
tokio::net::tcp::TcpStream,
Response,
Request,
async_bincode::SyncDestination,
>,
current: Option<oneshot::Sender<Response>>,
send: Option<Request>,
}
impl Future for ConnectionInner {
type Item = ();
type Error = bincode::Error;
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
loop {
if self.current.is_none() {
match self.reqs.poll() {
Ok(Async::Ready(Some((req, reply)))) => {
self.current = Some(reply);
self.send = Some(req);
}
Ok(Async::Ready(None)) => {
// EOF
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => {
return Err(Box::new(bincode::ErrorKind::Io(io::Error::new(
io::ErrorKind::Interrupted,
"failed to receive request",
))))
}
}
}
if let Some(r) = self.send.take() {
if let AsyncSink::NotReady(r) = self.stream.start_send(r)? {
self.send = Some(r);
return Ok(Async::NotReady);
}
}
try_ready!(self.stream.poll_complete());
if let Some(r) = try_ready!(self.stream.poll()) {
// ignore send failure
let _ = self.current.take().unwrap().send(r);
} else {
// unexpected EOF
return Err(Box::new(bincode::ErrorKind::Io(io::Error::new(
io::ErrorKind::BrokenPipe,
"EOF while waiting for reply",
))));
}
}
}
}
#[derive(Clone)]
pub struct Connection {
send: mpsc::UnboundedSender<(Request, oneshot::Sender<Response>)>,
}
impl Connection {
pub fn new(
stream: tokio::net::tcp::ConnectFuture,
) -> impl Future<Item = Connection, Error = ()> {
let (tx, rx) = mpsc::unbounded();
future::lazy(move || {
tokio::spawn(
stream
.map_err(|e| panic!("{:?}", e))
.and_then(move |s| ConnectionInner {
reqs: rx,
stream: s.into(),
current: None,
send: None,
})
.map_err(|e| panic!("{:?}", e)),
);
Ok(Connection { send: tx })
})
}
}
impl tower_service::Service for Connection {
type Request = Request;
type Response = Response;
type Error = ();
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: Self::Request) -> Self::Future {
let (tx, rx) = oneshot::channel();
Box::new(
self.send
.unbounded_send((req, tx))
.map_err(|_| ())
.into_future()
.and_then(move |_| rx.map_err(|_| ())),
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.