Skip to content

Instantly share code, notes, and snippets.

@ivan
Last active April 7, 2023 12:22
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ivan/3c8a7dd325e7ffd3fb7bc3e46975b58a to your computer and use it in GitHub Desktop.
Save ivan/3c8a7dd325e7ffd3fb7bc3e46975b58a to your computer and use it in GitHub Desktop.
tokio::io::AsyncRead poll_read implementation footgun

The tokio::io::AsyncRead documentation confusingly states:

Poll::Ready(Ok(())) means that data was immediately read and placed into the output buffer. The amount of data read can be determined by the increase in the length of the slice returned by ReadBuf::filled. If the difference is 0, EOF has been reached.

https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html

The footgun is that when poll_read is called, buf may already be partially filled. In some cases, this happens just a small fraction of the time in production, causing e.g. the last chunk of a file to be poll_read twice.

Setup code:

use std::pin::Pin;
use std::sync::Arc;
use parking_lot::Mutex;
use pin_project::pin_project;
use futures::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};

#[pin_project]
pub(crate) struct Blake3HashingReader<A: AsyncRead> {
    #[pin]
    inner: A,
    b3sum: Arc<Mutex<blake3::Hasher>>,
}

impl<A: AsyncRead> Blake3HashingReader<A> {
    pub fn new(inner: A) -> Blake3HashingReader<A> {
        let b3sum = Arc::new(Mutex::new(blake3::Hasher::new()));
        Blake3HashingReader { inner, b3sum }
    }

    /// Returns an `Arc` which can be derefenced to get the blake3 Hasher
    #[inline]
    pub fn b3sum(&self) -> Arc<Mutex<blake3::Hasher>> {
        self.b3sum.clone()
    }
}

Buggy - the last read is sometimes hashed twice:

impl<R> AsyncRead for Blake3HashingReader<R>
where
    R: AsyncRead,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let b3sum = self.b3sum();
        let inner_poll = self.project().inner.poll_read(cx, buf);
        b3sum.lock().update(buf.filled());
        inner_poll
    }
}

Still buggy - checking for Poll::Ready doesn't solve this:

impl<R> AsyncRead for Blake3HashingReader<R>
where
    R: AsyncRead,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let b3sum = self.b3sum();
        let inner_poll = self.project().inner.poll_read(cx, buf);
        if let Poll::Ready(Ok(_)) = inner_poll {
            b3sum.lock().update(buf.filled());
        }
        inner_poll
    }
}

Correct - always checking what was already filled:

impl<R> AsyncRead for Blake3HashingReader<R>
where
    R: AsyncRead,
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let b3sum = self.b3sum();
        let already_filled = buf.filled().len();
        let inner_poll = self.project().inner.poll_read(cx, buf);
        if let Poll::Ready(Ok(_)) = inner_poll {
            b3sum.lock().update(&buf.filled()[already_filled..]);
        }
        inner_poll
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment