The tokio::io::AsyncRead documentation confusingly states:
Poll::Ready(Ok(())) means that data was immediately read and placed into the output buffer. The amount of data read can be determined by the increase in the length of the slice returned by ReadBuf::filled. If the difference is 0, EOF has been reached.
https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html
The footgun is that when poll_read
is called, buf
may already be partially filled. In some cases, this happens just a small fraction of the time in production, causing e.g. the last chunk of a file to be poll_read
twice.
Setup code:
use std::pin::Pin;
use std::sync::Arc;
use parking_lot::Mutex;
use pin_project::pin_project;
use futures::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
#[pin_project]
pub(crate) struct Blake3HashingReader<A: AsyncRead> {
#[pin]
inner: A,
b3sum: Arc<Mutex<blake3::Hasher>>,
}
impl<A: AsyncRead> Blake3HashingReader<A> {
pub fn new(inner: A) -> Blake3HashingReader<A> {
let b3sum = Arc::new(Mutex::new(blake3::Hasher::new()));
Blake3HashingReader { inner, b3sum }
}
/// Returns an `Arc` which can be derefenced to get the blake3 Hasher
#[inline]
pub fn b3sum(&self) -> Arc<Mutex<blake3::Hasher>> {
self.b3sum.clone()
}
}
Buggy - the last read is sometimes hashed twice:
impl<R> AsyncRead for Blake3HashingReader<R>
where
R: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let b3sum = self.b3sum();
let inner_poll = self.project().inner.poll_read(cx, buf);
b3sum.lock().update(buf.filled());
inner_poll
}
}
Still buggy - checking for Poll::Ready
doesn't solve this:
impl<R> AsyncRead for Blake3HashingReader<R>
where
R: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let b3sum = self.b3sum();
let inner_poll = self.project().inner.poll_read(cx, buf);
if let Poll::Ready(Ok(_)) = inner_poll {
b3sum.lock().update(buf.filled());
}
inner_poll
}
}
Correct - always checking what was already filled:
impl<R> AsyncRead for Blake3HashingReader<R>
where
R: AsyncRead,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let b3sum = self.b3sum();
let already_filled = buf.filled().len();
let inner_poll = self.project().inner.poll_read(cx, buf);
if let Poll::Ready(Ok(_)) = inner_poll {
b3sum.lock().update(&buf.filled()[already_filled..]);
}
inner_poll
}
}