Rust Pipe
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{ | |
collections::VecDeque, | |
sync::{Arc, Mutex}, | |
task::{Poll, Waker}, | |
}; | |
use tokio::io::AsyncRead; | |
use wasi_common::WasiFile; | |
/// A bi-directional meory pipe for bytes with a capacity. | |
/// Allows reading and writing binary data. | |
/// | |
/// The pipe has multi-writer, single reader semantics. | |
/// | |
/// The pipe offers both blocking and non-blocking async-enabled semantics. | |
type IoPipe = Arc<Mutex<IoPipeInner>>; | |
struct IoPipeInner { | |
// The buffer holding data. | |
buffer: VecDeque<u8>, | |
// The maximum capacity of the buffer. | |
// | |
// NOTE: the capacity is also used to signify the closed state. | |
// If capacity is 0, then either the reader manually closed the pipe, | |
// or the reader has been dropped. | |
// Writers will then fail with an appropriate error. | |
capacity: usize, | |
// Reader will be awoken once data is available. | |
waiting_reader: Option<Waker>, | |
// TODO: Investigate better options for writer notification | |
// could use tokio::sync::Notify here, but that requires a borrowed | |
// reference for waiting, meaning it would need an extra Arc<Notify> ... | |
waiting_writers: Vec<Waker>, | |
} | |
impl IoPipeInner { | |
fn new(capacity: usize) -> Self { | |
Self { | |
buffer: VecDeque::new(), | |
capacity, | |
waiting_reader: None, | |
waiting_writers: Vec::new(), | |
} | |
} | |
#[inline] | |
fn is_closed(&self) -> bool { | |
self.capacity == 0 | |
} | |
/// Get available capacity. | |
/// | |
/// Returns ErrorKind::BrokenPipe if the pipe is closed. | |
#[inline] | |
fn available(&self) -> Result<usize, std::io::Error> { | |
if self.is_closed() { | |
Err(std::io::Error::new( | |
std::io::ErrorKind::BrokenPipe, | |
"Pipe is closed", | |
)) | |
} else { | |
debug_assert!( | |
self.capacity >= self.buffer.len(), | |
"buffer may never be larger than capacity" | |
); | |
Ok(self.capacity - self.buffer.len()) | |
} | |
} | |
// /// Get available capacity. | |
// /// | |
// /// Returns ErrorKind::BrokenPipe if the pipe was closed, and | |
// /// ErrorKind::WouldBlock if the buffer is full. | |
// fn require_available(&self) -> Result<usize, std::io::Error> { | |
// let available = self.available()?; | |
// if available < 1 { | |
// Err(std::io::Error::new( | |
// std::io::ErrorKind::WouldBlock, | |
// "Pipe is full", | |
// )) | |
// } else { | |
// Ok(available) | |
// } | |
// } | |
#[inline] | |
fn register_reader(&mut self, waker: Waker) { | |
debug_assert!( | |
self.waiting_reader.is_none(), | |
"there may never be more than one reader" | |
); | |
self.waiting_reader = Some(waker); | |
} | |
fn notify_reader(&mut self) { | |
if let Some(waker) = self.waiting_reader.take() { | |
waker.wake(); | |
} | |
} | |
fn notify_writers(&mut self) { | |
for writer in self.waiting_writers.drain(..) { | |
writer.wake(); | |
} | |
} | |
/// Writes to the buffer, but returns ErrorKind::WouldBlock if buffer is full. | |
fn write_fail_if_full(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> { | |
if buf.is_empty() { | |
return Ok(0); | |
} | |
let available = self.available()?; | |
if available < 1 { | |
return Err(std::io::Error::new( | |
std::io::ErrorKind::WouldBlock, | |
"Pipe is full", | |
)); | |
} | |
let size_to_write = std::cmp::min(available, buf.len()); | |
let slice = &buf[0..size_to_write]; | |
self.buffer.extend(slice); | |
self.notify_reader(); | |
Ok(size_to_write) | |
} | |
} | |
#[derive(Clone)] | |
pub struct IoPipeWriter(IoPipe); | |
impl std::fmt::Debug for IoPipeWriter { | |
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | |
f.debug_tuple("IoPipeWriter").finish() | |
} | |
} | |
pub struct WriteAllFuture<'a, 'b> { | |
pipe: &'a IoPipeWriter, | |
slice: &'b [u8], | |
} | |
impl<'a, 'b> std::future::Future for WriteAllFuture<'a, 'b> { | |
type Output = Result<(), std::io::Error>; | |
fn poll( | |
mut self: std::pin::Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
) -> Poll<Self::Output> { | |
if self.slice.is_empty() { | |
return Poll::Ready(Ok(())); | |
} | |
let mut inner = self.pipe.0.lock().unwrap(); | |
let available = match inner.available() { | |
Ok(x) => x, | |
Err(err) => { | |
return Poll::Ready(Err(err)); | |
} | |
}; | |
if available < 1 { | |
inner.waiting_writers.push(cx.waker().clone()); | |
return Poll::Pending; | |
}; | |
let length = self.slice.len(); | |
let write_size = std::cmp::min(available, length); | |
if write_size == self.slice.len() { | |
inner.buffer.extend(self.slice); | |
inner.notify_reader(); | |
std::mem::drop(inner); | |
self.slice = &[]; | |
Poll::Ready(Ok(())) | |
} else { | |
inner.buffer.extend(&self.slice[0..write_size]); | |
inner.notify_reader(); | |
inner.waiting_writers.push(cx.waker().clone()); | |
std::mem::drop(inner); | |
self.slice = &self.slice[write_size..]; | |
Poll::Pending | |
} | |
} | |
} | |
impl IoPipeWriter { | |
/// Try to write to the buffer. | |
/// Only parts of the buffer may be written, depending on the available | |
/// capacity. | |
/// | |
/// If the pipe is full a std::io::ErrorKind::WouldBlock error is returned. | |
fn try_write(&self, buf: &[u8]) -> Result<usize, std::io::Error> { | |
self.0.lock().unwrap().write_fail_if_full(buf) | |
} | |
/// Write the entire slice into the buffer. | |
pub fn write_all_async<'a, 'b>(&'a self, data: &'b [u8]) -> WriteAllFuture<'a, 'b> { | |
WriteAllFuture { | |
pipe: self, | |
slice: data, | |
} | |
} | |
async fn write_vectored_async<'a>( | |
&self, | |
bufs: &[std::io::IoSlice<'a>], | |
) -> Result<usize, std::io::Error> { | |
if let Some(buf) = bufs.iter().find(|b| !b.is_empty()) { | |
let len = buf.len(); | |
self.write_all_async(buf).await?; | |
Ok(len) | |
} else { | |
Ok(0) | |
} | |
} | |
} | |
impl std::io::Write for IoPipeWriter { | |
/// Try to write to the buffer. | |
/// Only parts of the buffer may be written, depending on the available | |
/// capacity. | |
/// | |
/// If capacity is 0, a std::io::ErrorKind::WouldBlock error is returned. | |
fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> { | |
IoPipeWriter::try_write(self, buf) | |
} | |
fn flush(&mut self) -> std::io::Result<()> { | |
let inner = self.0.lock().unwrap(); | |
if !inner.buffer.is_empty() { | |
Err(std::io::Error::new( | |
std::io::ErrorKind::WouldBlock, | |
"Pipe is not empty", | |
)) | |
} else { | |
Ok(()) | |
} | |
} | |
} | |
impl WasiFile for IoPipeWriter { | |
fn as_any(&self) -> &dyn std::any::Any { | |
todo!() | |
} | |
fn sock_accept<'life0, 'async_trait>( | |
&'life0 mut self, | |
_fdflags: wasi_common::file::FdFlags, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<Box<dyn WasiFile>, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn datasync<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn sync<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn get_filetype<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<wasi_common::file::FileType, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn get_fdflags<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<wasi_common::file::FdFlags, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn set_fdflags<'life0, 'async_trait>( | |
&'life0 mut self, | |
_flags: wasi_common::file::FdFlags, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn get_filestat<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<wasi_common::file::Filestat, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn set_filestat_size<'life0, 'async_trait>( | |
&'life0 self, | |
_size: u64, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn advise<'life0, 'async_trait>( | |
&'life0 self, | |
_offset: u64, | |
_len: u64, | |
_advice: wasi_common::file::Advice, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn allocate<'life0, 'async_trait>( | |
&'life0 self, | |
_offset: u64, | |
_len: u64, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn set_times<'life0, 'async_trait>( | |
&'life0 self, | |
_atime: Option<wasi_common::SystemTimeSpec>, | |
_mtime: Option<wasi_common::SystemTimeSpec>, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn read_vectored<'a, 'life0, 'life1, 'async_trait>( | |
&'life0 self, | |
_bufs: &'life1 mut [std::io::IoSliceMut<'a>], | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'a: 'async_trait, | |
'life0: 'async_trait, | |
'life1: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn read_vectored_at<'a, 'life0, 'life1, 'async_trait>( | |
&'life0 self, | |
_bufs: &'life1 mut [std::io::IoSliceMut<'a>], | |
_offset: u64, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'a: 'async_trait, | |
'life0: 'async_trait, | |
'life1: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn write_vectored<'a, 'life0, 'life1, 'async_trait>( | |
&'life0 self, | |
bufs: &'life1 [std::io::IoSlice<'a>], | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'a: 'async_trait, | |
'life0: 'async_trait, | |
'life1: 'async_trait, | |
Self: 'async_trait, | |
{ | |
Box::pin(async move { | |
let n = IoPipeWriter::write_vectored_async(self, bufs).await?; | |
Ok(n.try_into()?) | |
}) | |
} | |
fn write_vectored_at<'a, 'life0, 'life1, 'async_trait>( | |
&'life0 self, | |
_bufs: &'life1 [std::io::IoSlice<'a>], | |
_offset: u64, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'a: 'async_trait, | |
'life0: 'async_trait, | |
'life1: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn seek<'life0, 'async_trait>( | |
&'life0 self, | |
_pos: std::io::SeekFrom, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn peek<'life0, 'life1, 'async_trait>( | |
&'life0 self, | |
_buf: &'life1 mut [u8], | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
'life1: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn num_ready_bytes<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn isatty(&self) -> bool { | |
todo!() | |
} | |
fn readable<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn writable<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
} | |
pub struct IoPipeReader(IoPipe); | |
pub struct IoPipeRead<'a, 'b> { | |
pipe: &'a IoPipeReader, | |
output: &'b mut [u8], | |
} | |
impl<'a, 'b> std::future::Future for IoPipeRead<'a, 'b> { | |
type Output = Result<usize, std::io::Error>; | |
fn poll( | |
mut self: std::pin::Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
) -> Poll<Self::Output> { | |
let mut inner = self.pipe.0.lock().unwrap(); | |
if inner.buffer.is_empty() { | |
inner.register_reader(cx.waker().clone()); | |
return Poll::Pending; | |
} | |
let (front, back) = inner.buffer.as_slices(); | |
let input_available = front.len(); | |
let output_available = self.output.len(); | |
let mut written = std::cmp::min(input_available, output_available); | |
self.output[0..written].copy_from_slice(&front[0..written]); | |
let remaining_output = &mut self.output[written..]; | |
let input_available = back.len(); | |
if !remaining_output.is_empty() && input_available > 0 { | |
let to_write = std::cmp::min(remaining_output.len(), input_available); | |
remaining_output.copy_from_slice(&back[0..to_write]); | |
written += to_write; | |
} | |
inner.buffer.drain(0..written); | |
// TODO: more fine-grained writer awakening? | |
// eg: wait until more has been written? | |
inner.notify_writers(); | |
Poll::Ready(Ok(written)) | |
} | |
} | |
impl AsyncRead for IoPipeReader { | |
fn poll_read( | |
self: std::pin::Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
buf: &mut tokio::io::ReadBuf<'_>, | |
) -> Poll<std::io::Result<()>> { | |
let mut inner = self.0.lock().unwrap(); | |
if inner.buffer.is_empty() { | |
inner.register_reader(cx.waker().clone()); | |
return Poll::Pending; | |
} | |
let (front, back) = inner.buffer.as_slices(); | |
let input_available = front.len(); | |
let output_available = buf.remaining(); | |
let to_write = std::cmp::min(input_available, output_available); | |
let slice = &front[0..to_write]; | |
// TODO: optimize | |
// put_slice does another size check and panics, probably want to avoid that | |
buf.put_slice(slice); | |
if to_write < output_available { | |
let to_write = std::cmp::min(buf.remaining(), back.len()); | |
let slice = &back[0..to_write]; | |
buf.put_slice(slice); | |
} | |
// Remove copied data from buffer. | |
inner.buffer.drain(0..to_write); | |
// Wake up writers because capacity is available again. | |
// TODO: more fine-graied writer awakening? | |
inner.notify_writers(); | |
Poll::Ready(Ok(())) | |
} | |
} | |
impl IoPipeReader { | |
// /// Read data from the pipe into the given buffer. | |
// /// | |
// /// Blocks until data is available. | |
// pub fn read_async_blocking<'a, 'b>(&'a self, buffer: &'b mut [u8]) -> IoPipeRead<'a, 'b> { | |
// IoPipeRead { | |
// pipe: self, | |
// output: buffer, | |
// } | |
// } | |
// pub async fn read_vectored_async<'a>( | |
// &self, | |
// bufs: &mut [std::io::IoSliceMut<'a>], | |
// ) -> Result<usize, std::io::Error> { | |
// if let Some(buf) = bufs.get_mut(0) { | |
// self.read_async_blocking(buf).await | |
// } else { | |
// Ok(0) | |
// } | |
// } | |
} | |
// TODO: optimize away the Arc<Mutex>> | |
// the extra Arc<Mutex<__>> here is really unfortunate : IoPipeReader can't be | |
// cloned! But WasiFile methods are all &self, so we need an extra lock. | |
struct WasiPipeReader(Arc<tokio::sync::Mutex<IoPipeReader>>); | |
impl From<IoPipeReader> for WasiPipeReader { | |
fn from(p: IoPipeReader) -> Self { | |
Self(Arc::new(tokio::sync::Mutex::new(p))) | |
} | |
} | |
impl WasiFile for WasiPipeReader { | |
fn as_any(&self) -> &dyn std::any::Any { | |
todo!() | |
} | |
fn sock_accept<'life0, 'async_trait>( | |
&'life0 mut self, | |
_fdflags: wasi_common::file::FdFlags, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<Box<dyn WasiFile>, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn datasync<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn sync<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn get_filetype<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<wasi_common::file::FileType, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn get_fdflags<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<wasi_common::file::FdFlags, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn set_fdflags<'life0, 'async_trait>( | |
&'life0 mut self, | |
_flags: wasi_common::file::FdFlags, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn get_filestat<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<wasi_common::file::Filestat, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn set_filestat_size<'life0, 'async_trait>( | |
&'life0 self, | |
_size: u64, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn advise<'life0, 'async_trait>( | |
&'life0 self, | |
_offset: u64, | |
_len: u64, | |
_advice: wasi_common::file::Advice, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn allocate<'life0, 'async_trait>( | |
&'life0 self, | |
_offset: u64, | |
_len: u64, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn set_times<'life0, 'async_trait>( | |
&'life0 self, | |
_atime: Option<wasi_common::SystemTimeSpec>, | |
_mtime: Option<wasi_common::SystemTimeSpec>, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn read_vectored<'a, 'life0, 'life1, 'async_trait>( | |
&'life0 self, | |
bufs: &'life1 mut [std::io::IoSliceMut<'a>], | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'a: 'async_trait, | |
'life0: 'async_trait, | |
'life1: 'async_trait, | |
Self: 'async_trait, | |
{ | |
if let Some(buf) = bufs.iter_mut().find(|b| !b.is_empty()) { | |
Box::pin(async move { | |
let inner = self.0.lock().await; | |
let n = IoPipeRead { | |
pipe: &*inner, | |
output: buf, | |
} | |
.await?; | |
Ok(n.try_into()?) | |
}) | |
} else { | |
Box::pin(async move { Ok(0) }) | |
} | |
} | |
fn read_vectored_at<'a, 'life0, 'life1, 'async_trait>( | |
&'life0 self, | |
_bufs: &'life1 mut [std::io::IoSliceMut<'a>], | |
_offset: u64, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'a: 'async_trait, | |
'life0: 'async_trait, | |
'life1: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn write_vectored<'a, 'life0, 'life1, 'async_trait>( | |
&'life0 self, | |
_bufs: &'life1 [std::io::IoSlice<'a>], | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'a: 'async_trait, | |
'life0: 'async_trait, | |
'life1: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn write_vectored_at<'a, 'life0, 'life1, 'async_trait>( | |
&'life0 self, | |
_bufs: &'life1 [std::io::IoSlice<'a>], | |
_offset: u64, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'a: 'async_trait, | |
'life0: 'async_trait, | |
'life1: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn seek<'life0, 'async_trait>( | |
&'life0 self, | |
_pos: std::io::SeekFrom, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn peek<'life0, 'life1, 'async_trait>( | |
&'life0 self, | |
_buf: &'life1 mut [u8], | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
'life1: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn num_ready_bytes<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<u64, anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn isatty(&self) -> bool { | |
todo!() | |
} | |
fn readable<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
fn writable<'life0, 'async_trait>( | |
&'life0 self, | |
) -> core::pin::Pin< | |
Box< | |
dyn core::future::Future<Output = Result<(), anyhow::Error>> | |
+ core::marker::Send | |
+ 'async_trait, | |
>, | |
> | |
where | |
'life0: 'async_trait, | |
Self: 'async_trait, | |
{ | |
todo!() | |
} | |
} | |
pub fn pipe(capacity: usize) -> (IoPipeWriter, IoPipeReader) { | |
let inner = Arc::new(Mutex::new(IoPipeInner::new(capacity))); | |
(IoPipeWriter(inner.clone()), IoPipeReader(inner)) | |
} | |
#[cfg(test)] | |
mod tests { | |
use tokio::io::AsyncReadExt; | |
use super::*; | |
#[tokio::test] | |
async fn test_io_pipe() { | |
let (w, mut r) = pipe(100); | |
w.write_all_async(&[1, 2, 3, 4, 5]).await.unwrap(); | |
let mut buf = [0u8; 2]; | |
let count = r.read(&mut buf).await.unwrap(); | |
assert_eq!(count, 2); | |
assert_eq!(buf, [1, 2]); | |
let count = r.read(&mut buf).await.unwrap(); | |
assert_eq!(count, 2); | |
assert_eq!(buf, [3, 4]); | |
buf = [0, 0]; | |
let count = r.read(&mut buf).await.unwrap(); | |
assert_eq!(count, 1); | |
assert_eq!(buf, [5, 0]); | |
} | |
#[tokio::test] | |
async fn test_io_pipe_async_write_excees_capacity() { | |
let data = [0u8, 1, 2, 3, 4, 5, 6, 7]; | |
let (w, mut r) = pipe(3); | |
let handle = tokio::spawn(async move { | |
w.write_all_async(&data).await.unwrap(); | |
}); | |
let mut buf = [0u8; 3]; | |
let count = r.read(&mut buf).await.unwrap(); | |
assert_eq!(count, 3); | |
assert_eq!(buf, [0, 1, 2]); | |
let count = r.read(&mut buf).await.unwrap(); | |
assert_eq!(count, 3); | |
assert_eq!(buf, [3, 4, 5]); | |
let mut buf = [0u8; 3]; | |
let count = r.read(&mut buf).await.unwrap(); | |
assert_eq!(count, 2); | |
assert_eq!(buf, [6, 7, 0]); | |
handle.await.unwrap(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment