Created
September 11, 2019 12:35
-
-
Save jnicholls/d9ad0b9ff686bebc5855ee9a6dd643e7 to your computer and use it in GitHub Desktop.
A message stream example, using an underlying bincode bi-directional stream, which in turn uses tokio-serde under that.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::marker::PhantomData; | |
use std::pin::Pin; | |
use std::task::{Context, Poll}; | |
use bincode::Error; | |
use bytes::{Bytes, BytesMut}; | |
use futures::{Sink, Stream, TryStream}; | |
use pin_utils::unsafe_pinned; | |
use serde::{Deserialize, Serialize}; | |
use tokio_serde::{Deserializer, FramedRead, FramedWrite, Serializer}; | |
struct Bincode<T> { | |
ghost: PhantomData<T>, | |
} | |
impl<T> Bincode<T> { | |
fn new() -> Self { | |
Self { ghost: PhantomData } | |
} | |
} | |
impl<T> Deserializer<T> for Bincode<T> | |
where | |
T: for<'de> Deserialize<'de>, | |
{ | |
type Error = Error; | |
fn deserialize(self: Pin<&mut Self>, src: &BytesMut) -> Result<T, Error> { | |
bincode::deserialize(src) | |
} | |
} | |
impl<T: Serialize> Serializer<T> for Bincode<T> { | |
type Error = Error; | |
fn serialize(self: Pin<&mut Self>, item: &T) -> Result<Bytes, Self::Error> { | |
bincode::serialize(item).map(Into::into) | |
} | |
} | |
/// Adapts a stream of Bincode encoded buffers to a stream of values by | |
/// deserializing them, as well as provides a sink to serialize values into | |
/// Bincode buffers. | |
/// | |
/// `BincodeStream` implements `Stream` by polling the inner buffer stream and | |
/// deserializing the buffer as Bincode. It expects that each yielded buffer | |
/// represents a single Bincode value and does not contain any extra trailing | |
/// bytes. | |
/// | |
/// `BincodeStream` also implements `Sink` by serializing the submitted values | |
/// to a buffer. The buffer is then sent to the inner stream, which is responsible | |
/// for handling framing on the wire. | |
pub struct BincodeStream<V, R, W> { | |
read: FramedRead<R, V, Bincode<V>>, | |
write: FramedWrite<W, V, Bincode<V>>, | |
} | |
impl<V, R, W> BincodeStream<V, R, W> | |
where | |
V: for<'de> Deserialize<'de> + Serialize, | |
R: TryStream<Ok = BytesMut>, | |
R::Error: From<Error>, | |
W: Sink<Bytes>, | |
{ | |
/// Creates a new `BincodeStream` with the given read stream and write sink. | |
pub fn new(read: R, write: W) -> Self { | |
let read = FramedRead::new(read, Bincode::new()); | |
let write = FramedWrite::new(write, Bincode::new()); | |
Self { read, write } | |
} | |
} | |
impl<V, R, W> BincodeStream<V, R, W> { | |
unsafe_pinned!(read: FramedRead<R, V, Bincode<V>>); | |
unsafe_pinned!(write: FramedWrite<W, V, Bincode<V>>); | |
/// Consumes the `BincodeStream`, returning its underlying read stream and | |
/// write sink. | |
/// | |
/// Note that care should be taken to not tamper with the underlying streams | |
/// as it may corrupt the frames of data coming in or going out. | |
pub fn into_inner(self) -> (R, W) { | |
(self.read.into_inner(), self.write.into_inner()) | |
} | |
} | |
impl<V, R, W> Stream for BincodeStream<V, R, W> | |
where | |
V: for<'de> Deserialize<'de>, | |
R: TryStream<Ok = BytesMut>, | |
R::Error: From<Error>, | |
{ | |
type Item = Result<V, R::Error>; | |
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
self.read().poll_next(cx) | |
} | |
} | |
impl<V, R, W> Sink<V> for BincodeStream<V, R, W> | |
where | |
V: Serialize, | |
W: Sink<Bytes>, | |
W::Error: From<Error>, | |
{ | |
type Error = W::Error; | |
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { | |
self.write().poll_ready(cx) | |
} | |
fn start_send(self: Pin<&mut Self>, item: V) -> Result<(), Self::Error> { | |
self.write().start_send(item) | |
} | |
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { | |
self.write().poll_flush(cx) | |
} | |
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> { | |
self.write().poll_close(cx) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mismatched types | |
expected type parameter, found struct `tokio_codec::framed_read::FramedRead` | |
note: expected type `bincode_stream::BincodeStream<message::Message, R, W>` | |
found type `bincode_stream::BincodeStream<_, tokio_codec::framed_read::FramedRead<R, tokio_codec::length_delimited::LengthDelimitedCodec>, tokio_codec::framed_write::FramedWrite<W, tokio_codec::length_delimited::LengthDelimitedCodec>>` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
the trait bound `std::io::Error: std::convert::From<std::boxed::Box<bincode::error::ErrorKind>>` is not satisfied | |
the trait `std::convert::From<std::boxed::Box<bincode::error::ErrorKind>>` is not implemented for `std::io::Error` | |
help: the following implementations were found: | |
<std::io::Error as std::convert::From<flate2::mem::CompressError>> | |
<std::io::Error as std::convert::From<flate2::mem::DecompressError>> | |
<std::io::Error as std::convert::From<getrandom::error::Error>> | |
<std::io::Error as std::convert::From<rand_core::error::Error>> | |
and 8 others | |
the trait bound `R: std::marker::Unpin` is not satisfied | |
the trait `std::marker::Unpin` is not implemented for `R` | |
help: consider adding a `where R: std::marker::Unpin` bound | |
note: required because of the requirements on the impl of `futures_core::stream::Stream` for `tokio_codec::framed_read::FramedRead<R, tokio_codec::length_delimited::LengthDelimitedCodec>` | |
note: required because of the requirements on the impl of `futures_core::stream::TryStream` for `tokio_codec::framed_read::FramedRead<R, tokio_codec::length_delimited::LengthDelimitedCodec>` | |
the trait bound `W: std::marker::Unpin` is not satisfied | |
the trait `std::marker::Unpin` is not implemented for `W` | |
help: consider adding a `where W: std::marker::Unpin` bound | |
note: required because of the requirements on the impl of `futures_sink::Sink<bytes::bytes::Bytes>` for `tokio_codec::framed_write::FramedWrite<W, tokio_codec::length_delimited::LengthDelimitedCodec>` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use serde::{Deserialize, Serialize}; | |
use tokio::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; | |
use tokio::io::{AsyncRead, AsyncWrite}; | |
use crate::bincode_stream::BincodeStream; | |
#[derive(Clone, Debug, Deserialize, Serialize)] | |
pub(crate) enum Message {} | |
pub(crate) struct MessageStream<R, W> { | |
inner: BincodeStream<Message, R, W>, | |
} | |
impl<R, W> MessageStream<R, W> | |
where | |
R: AsyncRead, | |
W: AsyncWrite, | |
{ | |
pub fn new(read: R, write: W) -> Self { | |
let inner_read = FramedRead::new(read, LengthDelimitedCodec::new()); | |
let inner_write = FramedWrite::new(write, LengthDelimitedCodec::new()); | |
let inner = BincodeStream::new(inner_read, inner_write); | |
Self { inner } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment