Skip to content

Instantly share code, notes, and snippets.

@polachok
Created May 16, 2017 16:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save polachok/ed3697e10594c278434040fd87c1f070 to your computer and use it in GitHub Desktop.
Save polachok/ed3697e10594c278434040fd87c1f070 to your computer and use it in GitHub Desktop.
Framed delimited?
struct FramedTransport<T, C> where T: AsyncRead + AsyncWrite + 'static {
inner: length_delimited::Framed<T>,
codec: C,
}
fn framed_delimited<T, C>(framed: length_delimited::Framed<T>, codec: C) -> FramedTransport<T, C>
where T: AsyncRead + AsyncWrite, C: codec::Encoder + codec::Decoder
{
FramedTransport{ inner: framed, codec: codec }
}
impl<T, C> Stream for FramedTransport<T, C> where
T: AsyncRead + AsyncWrite, C: codec::Decoder,
::std::io::Error: ::std::convert::From<<C as ::tokio_io::codec::Decoder>::Error> {
type Item = <C as codec::Decoder>::Item;
type Error = <C as codec::Decoder>::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
use futures::Async;
let codec = &mut self.codec;
self.inner.poll().and_then(|async| {
match async {
Async::Ready(Some(mut buf)) => {
let pkt = try!(codec.decode(&mut buf));
Ok(Async::Ready(pkt))
},
Async::Ready(None) => {
Ok(Async::Ready(None))
},
Async::NotReady => {
Ok(Async::NotReady)
}
}
}).map_err(|e| e.into())
}
}
impl<T, C> Sink for FramedTransport<T, C> where
T: AsyncRead + AsyncWrite + 'static,
C: codec::Encoder + codec::Decoder,
::std::io::Error: ::std::convert::From<<C as ::tokio_io::codec::Encoder>::Error> {
type SinkItem = <C as codec::Encoder>::Item;
type SinkError = <C as codec::Encoder>::Error;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
use futures::AsyncSink;
let codec = &mut self.codec;
let mut buf = BytesMut::with_capacity(64);
try!(codec.encode(item, &mut buf));
assert!(try!(self.inner.start_send(buf)).is_ready());
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.inner.poll_complete().map_err(|e| e.into())
}
fn close(&mut self) -> Poll<(), Self::SinkError> {
try_ready!(self.poll_complete().map_err(|e| e.into()));
self.inner.close().map_err(|e| e.into())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment