Skip to content

Instantly share code, notes, and snippets.

@jnicholls
Created September 11, 2019 12:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jnicholls/d9ad0b9ff686bebc5855ee9a6dd643e7 to your computer and use it in GitHub Desktop.
Save jnicholls/d9ad0b9ff686bebc5855ee9a6dd643e7 to your computer and use it in GitHub Desktop.
A message stream example, using an underlying bincode bi-directional stream, which in turn uses tokio-serde under that.
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use bincode::Error;
use bytes::{Bytes, BytesMut};
use futures::{Sink, Stream, TryStream};
use pin_utils::unsafe_pinned;
use serde::{Deserialize, Serialize};
use tokio_serde::{Deserializer, FramedRead, FramedWrite, Serializer};
struct Bincode<T> {
ghost: PhantomData<T>,
}
impl<T> Bincode<T> {
fn new() -> Self {
Self { ghost: PhantomData }
}
}
impl<T> Deserializer<T> for Bincode<T>
where
T: for<'de> Deserialize<'de>,
{
type Error = Error;
fn deserialize(self: Pin<&mut Self>, src: &BytesMut) -> Result<T, Error> {
bincode::deserialize(src)
}
}
impl<T: Serialize> Serializer<T> for Bincode<T> {
type Error = Error;
fn serialize(self: Pin<&mut Self>, item: &T) -> Result<Bytes, Self::Error> {
bincode::serialize(item).map(Into::into)
}
}
/// Adapts a stream of Bincode encoded buffers to a stream of values by
/// deserializing them, as well as provides a sink to serialize values into
/// Bincode buffers.
///
/// `BincodeStream` implements `Stream` by polling the inner buffer stream and
/// deserializing the buffer as Bincode. It expects that each yielded buffer
/// represents a single Bincode value and does not contain any extra trailing
/// bytes.
///
/// `BincodeStream` also implements `Sink` by serializing the submitted values
/// to a buffer. The buffer is then sent to the inner stream, which is responsible
/// for handling framing on the wire.
pub struct BincodeStream<V, R, W> {
read: FramedRead<R, V, Bincode<V>>,
write: FramedWrite<W, V, Bincode<V>>,
}
impl<V, R, W> BincodeStream<V, R, W>
where
V: for<'de> Deserialize<'de> + Serialize,
R: TryStream<Ok = BytesMut>,
R::Error: From<Error>,
W: Sink<Bytes>,
{
/// Creates a new `BincodeStream` with the given read stream and write sink.
pub fn new(read: R, write: W) -> Self {
let read = FramedRead::new(read, Bincode::new());
let write = FramedWrite::new(write, Bincode::new());
Self { read, write }
}
}
impl<V, R, W> BincodeStream<V, R, W> {
unsafe_pinned!(read: FramedRead<R, V, Bincode<V>>);
unsafe_pinned!(write: FramedWrite<W, V, Bincode<V>>);
/// Consumes the `BincodeStream`, returning its underlying read stream and
/// write sink.
///
/// Note that care should be taken to not tamper with the underlying streams
/// as it may corrupt the frames of data coming in or going out.
pub fn into_inner(self) -> (R, W) {
(self.read.into_inner(), self.write.into_inner())
}
}
impl<V, R, W> Stream for BincodeStream<V, R, W>
where
V: for<'de> Deserialize<'de>,
R: TryStream<Ok = BytesMut>,
R::Error: From<Error>,
{
type Item = Result<V, R::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.read().poll_next(cx)
}
}
impl<V, R, W> Sink<V> for BincodeStream<V, R, W>
where
V: Serialize,
W: Sink<Bytes>,
W::Error: From<Error>,
{
type Error = W::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.write().poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: V) -> Result<(), Self::Error> {
self.write().start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.write().poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.write().poll_close(cx)
}
}
mismatched types
expected type parameter, found struct `tokio_codec::framed_read::FramedRead`
note: expected type `bincode_stream::BincodeStream<message::Message, R, W>`
found type `bincode_stream::BincodeStream<_, tokio_codec::framed_read::FramedRead<R, tokio_codec::length_delimited::LengthDelimitedCodec>, tokio_codec::framed_write::FramedWrite<W, tokio_codec::length_delimited::LengthDelimitedCodec>>`
the trait bound `std::io::Error: std::convert::From<std::boxed::Box<bincode::error::ErrorKind>>` is not satisfied
the trait `std::convert::From<std::boxed::Box<bincode::error::ErrorKind>>` is not implemented for `std::io::Error`
help: the following implementations were found:
<std::io::Error as std::convert::From<flate2::mem::CompressError>>
<std::io::Error as std::convert::From<flate2::mem::DecompressError>>
<std::io::Error as std::convert::From<getrandom::error::Error>>
<std::io::Error as std::convert::From<rand_core::error::Error>>
and 8 others
the trait bound `R: std::marker::Unpin` is not satisfied
the trait `std::marker::Unpin` is not implemented for `R`
help: consider adding a `where R: std::marker::Unpin` bound
note: required because of the requirements on the impl of `futures_core::stream::Stream` for `tokio_codec::framed_read::FramedRead<R, tokio_codec::length_delimited::LengthDelimitedCodec>`
note: required because of the requirements on the impl of `futures_core::stream::TryStream` for `tokio_codec::framed_read::FramedRead<R, tokio_codec::length_delimited::LengthDelimitedCodec>`
the trait bound `W: std::marker::Unpin` is not satisfied
the trait `std::marker::Unpin` is not implemented for `W`
help: consider adding a `where W: std::marker::Unpin` bound
note: required because of the requirements on the impl of `futures_sink::Sink<bytes::bytes::Bytes>` for `tokio_codec::framed_write::FramedWrite<W, tokio_codec::length_delimited::LengthDelimitedCodec>`
use serde::{Deserialize, Serialize};
use tokio::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tokio::io::{AsyncRead, AsyncWrite};
use crate::bincode_stream::BincodeStream;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub(crate) enum Message {}
pub(crate) struct MessageStream<R, W> {
inner: BincodeStream<Message, R, W>,
}
impl<R, W> MessageStream<R, W>
where
R: AsyncRead,
W: AsyncWrite,
{
pub fn new(read: R, write: W) -> Self {
let inner_read = FramedRead::new(read, LengthDelimitedCodec::new());
let inner_write = FramedWrite::new(write, LengthDelimitedCodec::new());
let inner = BincodeStream::new(inner_read, inner_write);
Self { inner }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment