Skip to content

Instantly share code, notes, and snippets.

@RaasAhsan
Created April 24, 2022 23:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RaasAhsan/5329f3decfabedfb7c07875b084dda72 to your computer and use it in GitHub Desktop.
Save RaasAhsan/5329f3decfabedfb7c07875b084dda72 to your computer and use it in GitHub Desktop.
Rust mpsc byte buffering
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::thread;
enum Message {
Chunk(Vec<u8>),
Flush,
}
struct TxWrite {
tx: SyncSender<Message>,
}
impl Write for TxWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// TODO: can we avoid copying?
self.tx.send(Message::Chunk(buf.to_vec())).unwrap(); // TODO: handle error
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
self.tx.send(Message::Flush).unwrap(); // TODO: handle error
Ok(())
}
}
struct TxRead {
rx: Receiver<Message>,
// Technically we are buffering up to double a single message
partial: Option<Vec<u8>>,
}
impl Read for TxRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let incoming = match self.partial.take() {
Some(bytes) => Some(bytes),
None => {
let message = self.rx.recv().unwrap();
match message {
Message::Chunk(bytes) => Some(bytes),
Message::Flush => None,
}
}
};
match incoming {
Some(bytes) => {
let len = std::cmp::min(buf.len(), bytes.len());
&buf[..len].clone_from_slice(&bytes[..len]);
if bytes.len() <= buf.len() {
self.partial = None;
} else {
self.partial = Some((&bytes[len..]).to_vec());
}
Ok(len)
}
None => Ok(0),
}
}
}
fn main() {
let (tx, rx) = sync_channel::<Message>(0);
let tx_writer = TxWrite { tx };
let mut tx_reader = TxRead { rx, partial: None };
let mut writer = BufWriter::new(tx_writer);
let handle = thread::spawn(move || {
let mut buf = Vec::new();
tx_reader.read_to_end(&mut buf);
println!("{:?}", buf);
});
// calling grammar: write* flush
writer.write(&[1, 2, 3, 4, 5]).unwrap();
writer.write(&[1, 2, 3, 4, 5]).unwrap();
writer.flush();
handle.join();
println!("joined!");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment