Skip to content

Instantly share code, notes, and snippets.

@theduke
Created February 9, 2023 18:37
Show Gist options
  • Save theduke/ea11d8884adc130ec538779fcf657a10 to your computer and use it in GitHub Desktop.
Save theduke/ea11d8884adc130ec538779fcf657a10 to your computer and use it in GitHub Desktop.
Rust Pipe
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
task::{Poll, Waker},
};
use tokio::io::AsyncRead;
use wasi_common::WasiFile;
/// A bi-directional meory pipe for bytes with a capacity.
/// Allows reading and writing binary data.
///
/// The pipe has multi-writer, single reader semantics.
///
/// The pipe offers both blocking and non-blocking async-enabled semantics.
type IoPipe = Arc<Mutex<IoPipeInner>>;
struct IoPipeInner {
// The buffer holding data.
buffer: VecDeque<u8>,
// The maximum capacity of the buffer.
//
// NOTE: the capacity is also used to signify the closed state.
// If capacity is 0, then either the reader manually closed the pipe,
// or the reader has been dropped.
// Writers will then fail with an appropriate error.
capacity: usize,
// Reader will be awoken once data is available.
waiting_reader: Option<Waker>,
// TODO: Investigate better options for writer notification
// could use tokio::sync::Notify here, but that requires a borrowed
// reference for waiting, meaning it would need an extra Arc<Notify> ...
waiting_writers: Vec<Waker>,
}
impl IoPipeInner {
fn new(capacity: usize) -> Self {
Self {
buffer: VecDeque::new(),
capacity,
waiting_reader: None,
waiting_writers: Vec::new(),
}
}
#[inline]
fn is_closed(&self) -> bool {
self.capacity == 0
}
/// Get available capacity.
///
/// Returns ErrorKind::BrokenPipe if the pipe is closed.
#[inline]
fn available(&self) -> Result<usize, std::io::Error> {
if self.is_closed() {
Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Pipe is closed",
))
} else {
debug_assert!(
self.capacity >= self.buffer.len(),
"buffer may never be larger than capacity"
);
Ok(self.capacity - self.buffer.len())
}
}
// /// Get available capacity.
// ///
// /// Returns ErrorKind::BrokenPipe if the pipe was closed, and
// /// ErrorKind::WouldBlock if the buffer is full.
// fn require_available(&self) -> Result<usize, std::io::Error> {
// let available = self.available()?;
// if available < 1 {
// Err(std::io::Error::new(
// std::io::ErrorKind::WouldBlock,
// "Pipe is full",
// ))
// } else {
// Ok(available)
// }
// }
#[inline]
fn register_reader(&mut self, waker: Waker) {
debug_assert!(
self.waiting_reader.is_none(),
"there may never be more than one reader"
);
self.waiting_reader = Some(waker);
}
fn notify_reader(&mut self) {
if let Some(waker) = self.waiting_reader.take() {
waker.wake();
}
}
fn notify_writers(&mut self) {
for writer in self.waiting_writers.drain(..) {
writer.wake();
}
}
/// Writes to the buffer, but returns ErrorKind::WouldBlock if buffer is full.
fn write_fail_if_full(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
if buf.is_empty() {
return Ok(0);
}
let available = self.available()?;
if available < 1 {
return Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"Pipe is full",
));
}
let size_to_write = std::cmp::min(available, buf.len());
let slice = &buf[0..size_to_write];
self.buffer.extend(slice);
self.notify_reader();
Ok(size_to_write)
}
}
#[derive(Clone)]
pub struct IoPipeWriter(IoPipe);
impl std::fmt::Debug for IoPipeWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("IoPipeWriter").finish()
}
}
pub struct WriteAllFuture<'a, 'b> {
pipe: &'a IoPipeWriter,
slice: &'b [u8],
}
impl<'a, 'b> std::future::Future for WriteAllFuture<'a, 'b> {
type Output = Result<(), std::io::Error>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
if self.slice.is_empty() {
return Poll::Ready(Ok(()));
}
let mut inner = self.pipe.0.lock().unwrap();
let available = match inner.available() {
Ok(x) => x,
Err(err) => {
return Poll::Ready(Err(err));
}
};
if available < 1 {
inner.waiting_writers.push(cx.waker().clone());
return Poll::Pending;
};
let length = self.slice.len();
let write_size = std::cmp::min(available, length);
if write_size == self.slice.len() {
inner.buffer.extend(self.slice);
inner.notify_reader();
std::mem::drop(inner);
self.slice = &[];
Poll::Ready(Ok(()))
} else {
inner.buffer.extend(&self.slice[0..write_size]);
inner.notify_reader();
inner.waiting_writers.push(cx.waker().clone());
std::mem::drop(inner);
self.slice = &self.slice[write_size..];
Poll::Pending
}
}
}
impl IoPipeWriter {
/// Try to write to the buffer.
/// Only parts of the buffer may be written, depending on the available
/// capacity.
///
/// If the pipe is full a std::io::ErrorKind::WouldBlock error is returned.
fn try_write(&self, buf: &[u8]) -> Result<usize, std::io::Error> {
self.0.lock().unwrap().write_fail_if_full(buf)
}
/// Write the entire slice into the buffer.
pub fn write_all_async<'a, 'b>(&'a self, data: &'b [u8]) -> WriteAllFuture<'a, 'b> {
WriteAllFuture {
pipe: self,
slice: data,
}
}
async fn write_vectored_async<'a>(
&self,
bufs: &[std::io::IoSlice<'a>],
) -> Result<usize, std::io::Error> {
if let Some(buf) = bufs.iter().find(|b| !b.is_empty()) {
let len = buf.len();
self.write_all_async(buf).await?;
Ok(len)
} else {
Ok(0)
}
}
}
impl std::io::Write for IoPipeWriter {
/// Try to write to the buffer.
/// Only parts of the buffer may be written, depending on the available
/// capacity.
///
/// If capacity is 0, a std::io::ErrorKind::WouldBlock error is returned.
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
IoPipeWriter::try_write(self, buf)
}
fn flush(&mut self) -> std::io::Result<()> {
let inner = self.0.lock().unwrap();
if !inner.buffer.is_empty() {
Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"Pipe is not empty",
))
} else {
Ok(())
}
}
}
impl WasiFile for IoPipeWriter {
fn as_any(&self) -> &dyn std::any::Any {
todo!()
}
fn sock_accept<'life0, 'async_trait>(
&'life0 mut self,
_fdflags: wasi_common::file::FdFlags,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<Box<dyn WasiFile>, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn datasync<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn sync<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn get_filetype<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<wasi_common::file::FileType, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn get_fdflags<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<wasi_common::file::FdFlags, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn set_fdflags<'life0, 'async_trait>(
&'life0 mut self,
_flags: wasi_common::file::FdFlags,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn get_filestat<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<wasi_common::file::Filestat, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn set_filestat_size<'life0, 'async_trait>(
&'life0 self,
_size: u64,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn advise<'life0, 'async_trait>(
&'life0 self,
_offset: u64,
_len: u64,
_advice: wasi_common::file::Advice,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn allocate<'life0, 'async_trait>(
&'life0 self,
_offset: u64,
_len: u64,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn set_times<'life0, 'async_trait>(
&'life0 self,
_atime: Option<wasi_common::SystemTimeSpec>,
_mtime: Option<wasi_common::SystemTimeSpec>,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn read_vectored<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
_bufs: &'life1 mut [std::io::IoSliceMut<'a>],
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn read_vectored_at<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
_bufs: &'life1 mut [std::io::IoSliceMut<'a>],
_offset: u64,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn write_vectored<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
bufs: &'life1 [std::io::IoSlice<'a>],
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
Box::pin(async move {
let n = IoPipeWriter::write_vectored_async(self, bufs).await?;
Ok(n.try_into()?)
})
}
fn write_vectored_at<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
_bufs: &'life1 [std::io::IoSlice<'a>],
_offset: u64,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn seek<'life0, 'async_trait>(
&'life0 self,
_pos: std::io::SeekFrom,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn peek<'life0, 'life1, 'async_trait>(
&'life0 self,
_buf: &'life1 mut [u8],
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn num_ready_bytes<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn isatty(&self) -> bool {
todo!()
}
fn readable<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn writable<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
}
pub struct IoPipeReader(IoPipe);
pub struct IoPipeRead<'a, 'b> {
pipe: &'a IoPipeReader,
output: &'b mut [u8],
}
impl<'a, 'b> std::future::Future for IoPipeRead<'a, 'b> {
type Output = Result<usize, std::io::Error>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
let mut inner = self.pipe.0.lock().unwrap();
if inner.buffer.is_empty() {
inner.register_reader(cx.waker().clone());
return Poll::Pending;
}
let (front, back) = inner.buffer.as_slices();
let input_available = front.len();
let output_available = self.output.len();
let mut written = std::cmp::min(input_available, output_available);
self.output[0..written].copy_from_slice(&front[0..written]);
let remaining_output = &mut self.output[written..];
let input_available = back.len();
if !remaining_output.is_empty() && input_available > 0 {
let to_write = std::cmp::min(remaining_output.len(), input_available);
remaining_output.copy_from_slice(&back[0..to_write]);
written += to_write;
}
inner.buffer.drain(0..written);
// TODO: more fine-grained writer awakening?
// eg: wait until more has been written?
inner.notify_writers();
Poll::Ready(Ok(written))
}
}
impl AsyncRead for IoPipeReader {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let mut inner = self.0.lock().unwrap();
if inner.buffer.is_empty() {
inner.register_reader(cx.waker().clone());
return Poll::Pending;
}
let (front, back) = inner.buffer.as_slices();
let input_available = front.len();
let output_available = buf.remaining();
let to_write = std::cmp::min(input_available, output_available);
let slice = &front[0..to_write];
// TODO: optimize
// put_slice does another size check and panics, probably want to avoid that
buf.put_slice(slice);
if to_write < output_available {
let to_write = std::cmp::min(buf.remaining(), back.len());
let slice = &back[0..to_write];
buf.put_slice(slice);
}
// Remove copied data from buffer.
inner.buffer.drain(0..to_write);
// Wake up writers because capacity is available again.
// TODO: more fine-graied writer awakening?
inner.notify_writers();
Poll::Ready(Ok(()))
}
}
impl IoPipeReader {
// /// Read data from the pipe into the given buffer.
// ///
// /// Blocks until data is available.
// pub fn read_async_blocking<'a, 'b>(&'a self, buffer: &'b mut [u8]) -> IoPipeRead<'a, 'b> {
// IoPipeRead {
// pipe: self,
// output: buffer,
// }
// }
// pub async fn read_vectored_async<'a>(
// &self,
// bufs: &mut [std::io::IoSliceMut<'a>],
// ) -> Result<usize, std::io::Error> {
// if let Some(buf) = bufs.get_mut(0) {
// self.read_async_blocking(buf).await
// } else {
// Ok(0)
// }
// }
}
// TODO: optimize away the Arc<Mutex>>
// the extra Arc<Mutex<__>> here is really unfortunate : IoPipeReader can't be
// cloned! But WasiFile methods are all &self, so we need an extra lock.
struct WasiPipeReader(Arc<tokio::sync::Mutex<IoPipeReader>>);
impl From<IoPipeReader> for WasiPipeReader {
fn from(p: IoPipeReader) -> Self {
Self(Arc::new(tokio::sync::Mutex::new(p)))
}
}
impl WasiFile for WasiPipeReader {
fn as_any(&self) -> &dyn std::any::Any {
todo!()
}
fn sock_accept<'life0, 'async_trait>(
&'life0 mut self,
_fdflags: wasi_common::file::FdFlags,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<Box<dyn WasiFile>, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn datasync<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn sync<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn get_filetype<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<wasi_common::file::FileType, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn get_fdflags<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<wasi_common::file::FdFlags, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn set_fdflags<'life0, 'async_trait>(
&'life0 mut self,
_flags: wasi_common::file::FdFlags,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn get_filestat<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<wasi_common::file::Filestat, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn set_filestat_size<'life0, 'async_trait>(
&'life0 self,
_size: u64,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn advise<'life0, 'async_trait>(
&'life0 self,
_offset: u64,
_len: u64,
_advice: wasi_common::file::Advice,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn allocate<'life0, 'async_trait>(
&'life0 self,
_offset: u64,
_len: u64,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn set_times<'life0, 'async_trait>(
&'life0 self,
_atime: Option<wasi_common::SystemTimeSpec>,
_mtime: Option<wasi_common::SystemTimeSpec>,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn read_vectored<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
bufs: &'life1 mut [std::io::IoSliceMut<'a>],
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
if let Some(buf) = bufs.iter_mut().find(|b| !b.is_empty()) {
Box::pin(async move {
let inner = self.0.lock().await;
let n = IoPipeRead {
pipe: &*inner,
output: buf,
}
.await?;
Ok(n.try_into()?)
})
} else {
Box::pin(async move { Ok(0) })
}
}
fn read_vectored_at<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
_bufs: &'life1 mut [std::io::IoSliceMut<'a>],
_offset: u64,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn write_vectored<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
_bufs: &'life1 [std::io::IoSlice<'a>],
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn write_vectored_at<'a, 'life0, 'life1, 'async_trait>(
&'life0 self,
_bufs: &'life1 [std::io::IoSlice<'a>],
_offset: u64,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'a: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn seek<'life0, 'async_trait>(
&'life0 self,
_pos: std::io::SeekFrom,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn peek<'life0, 'life1, 'async_trait>(
&'life0 self,
_buf: &'life1 mut [u8],
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn num_ready_bytes<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<u64, anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn isatty(&self) -> bool {
todo!()
}
fn readable<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
fn writable<'life0, 'async_trait>(
&'life0 self,
) -> core::pin::Pin<
Box<
dyn core::future::Future<Output = Result<(), anyhow::Error>>
+ core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
todo!()
}
}
pub fn pipe(capacity: usize) -> (IoPipeWriter, IoPipeReader) {
let inner = Arc::new(Mutex::new(IoPipeInner::new(capacity)));
(IoPipeWriter(inner.clone()), IoPipeReader(inner))
}
#[cfg(test)]
mod tests {
use tokio::io::AsyncReadExt;
use super::*;
#[tokio::test]
async fn test_io_pipe() {
let (w, mut r) = pipe(100);
w.write_all_async(&[1, 2, 3, 4, 5]).await.unwrap();
let mut buf = [0u8; 2];
let count = r.read(&mut buf).await.unwrap();
assert_eq!(count, 2);
assert_eq!(buf, [1, 2]);
let count = r.read(&mut buf).await.unwrap();
assert_eq!(count, 2);
assert_eq!(buf, [3, 4]);
buf = [0, 0];
let count = r.read(&mut buf).await.unwrap();
assert_eq!(count, 1);
assert_eq!(buf, [5, 0]);
}
#[tokio::test]
async fn test_io_pipe_async_write_excees_capacity() {
let data = [0u8, 1, 2, 3, 4, 5, 6, 7];
let (w, mut r) = pipe(3);
let handle = tokio::spawn(async move {
w.write_all_async(&data).await.unwrap();
});
let mut buf = [0u8; 3];
let count = r.read(&mut buf).await.unwrap();
assert_eq!(count, 3);
assert_eq!(buf, [0, 1, 2]);
let count = r.read(&mut buf).await.unwrap();
assert_eq!(count, 3);
assert_eq!(buf, [3, 4, 5]);
let mut buf = [0u8; 3];
let count = r.read(&mut buf).await.unwrap();
assert_eq!(count, 2);
assert_eq!(buf, [6, 7, 0]);
handle.await.unwrap();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment