Skip to content

Instantly share code, notes, and snippets.

@edwardw
Last active December 29, 2019 15:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save edwardw/48fe8d5a6896ff96d741309917ac7f91 to your computer and use it in GitHub Desktop.
Save edwardw/48fe8d5a6896ff96d741309917ac7f91 to your computer and use it in GitHub Desktop.
AsyncRead adapter
//# bytes = "0.5"
//# futures = "0.3"
//# tokio = { version = "0.2", features = ["stream", "io-util"] }
//# tokio-util = { version = "0.2", features = ["codec"] }
mod adapter {
use futures::io as fio;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::io as tio;
pub struct AsyncReadAdapter<R>(R);
impl<R> AsyncReadAdapter<R> {
pub fn new(r: R) -> Self {
AsyncReadAdapter(r)
}
}
impl<R> tio::AsyncRead for AsyncReadAdapter<R>
where
R: fio::AsyncRead + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<tio::Result<usize>> {
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
}
}
}
use bytes::Bytes;
use futures::stream::TryStreamExt;
use tokio::io::AsyncRead;
use tokio::stream::Stream;
use tokio_util::codec;
fn stream_to_async_read<S>(s: S) -> impl AsyncRead
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Unpin,
{
let r = s.into_async_read();
adapter::AsyncReadAdapter::new(r)
}
fn async_read_to_stream<R: AsyncRead>(r: R) -> impl Stream {
codec::FramedRead::new(r, codec::BytesCodec::new()).map_ok(|bs| bs.freeze())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment