Skip to content

Instantly share code, notes, and snippets.

@octave99
Created April 7, 2019 16:05
Show Gist options
  • Save octave99/c1c32367d694d6e4aec1e1162c4a0782 to your computer and use it in GitHub Desktop.
Save octave99/c1c32367d694d6e4aec1e1162c4a0782 to your computer and use it in GitHub Desktop.
use futures::{Async, AsyncSink, Future, Sink};
use tokio::io::AsyncWrite;
use std::io;
pub struct ByteSink<W: AsyncWrite>(W);
impl<W: AsyncWrite> ByteSink<W> {
pub fn new(writer: W) -> Self {
ByteSink(writer)
}
}
impl<W: AsyncWrite> Sink for ByteSink<W> {
type SinkItem = Vec<u8>;
type SinkError = io::Error;
fn start_send(&mut self, item: Vec<u8>) -> Result<AsyncSink<Vec<u8>>, io::Error> {
match self.0.poll_write(&item)? {
Async::NotReady => Ok(AsyncSink::NotReady(item)),
Async::Ready(_) => Ok(AsyncSink::Ready),
}
}
fn poll_complete(&mut self) -> Result<Async<()>, io::Error> {
match self.0.poll_flush()? {
Async::Ready(_) => Ok(Async::Ready(())),
Async::NotReady => Ok(Async::NotReady),
}
}
}
#[cfg(test)]
mod tests {
use super::ByteSink;
use futures::{future::Future, sink::Sink};
#[test]
fn sink_send_works() {
let bytes = "Hello World!".as_bytes();
let writer = tokio::io::stdout();
let s = ByteSink::new(writer)
.send(bytes.to_vec())
.map(|_| ())
.map_err(|e| {
print!("{:?}", e);
()
});
tokio::run(s);
}
#[test]
fn async_write_works() {
let bytes = "Hello World!".as_bytes();
let writer = tokio::io::stdout();
let s = tokio::io::write_all(writer, bytes)
.map(|_| ())
.map_err(|_| ());
tokio::run(s);
}
}
@nlevitt
Copy link

nlevitt commented May 10, 2019

Thank you for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment