Skip to content

Instantly share code, notes, and snippets.

@WonderBeat
Last active December 21, 2022 12:41
Show Gist options
  • Save WonderBeat/92bcd9765e11f1e5a8857f6c364074db to your computer and use it in GitHub Desktop.
Save WonderBeat/92bcd9765e11f1e5a8857f6c364074db to your computer and use it in GitHub Desktop.
TrackingFsDataInputStream

TrackingFsDataInputStream batch tracking issue

org.apache.flink.connector.file.src.impl.StreamFormatAdapter.TrackingFsDataInputStream wraps underlying InputStream to count bytes consumed. org.apache.flink.connector.file.src.impl.StreamFormatAdapter.Reader relies on this to create batches of data.

            while (stream.hasRemainingInBatch() && (next = reader.read()) != null) {
                result.add(next);
            }

org.apache.flink.connector.file.src.impl.StreamFormatAdapter.TrackingFsDataInputStream#read(byte[], int, int) contains a bug that can lead to arbitrary size batches due to counter (remainingInBatch) overflow.

        public int read(byte[] b, int off, int len) throws IOException {
            remainingInBatch -= len;
            return stream.read(b, off, len);
        }

Every time we perform a stream.read() it may return less than len according to the javadoc.

Params:
bthe buffer into which the data is read. offthe start offset in array b at which the data is written. lenthe maximum number of bytes to read.
Returns:
the total number of bytes read into the buffer, or -1 if there is no more data because the end of the stream has been reached.

But current implementation accounts only bytes that were requested (len). E.x. S3 Hadoop FS can return less than len as a result of stream.read(b, off, len). This is expected and readers are aware of this org.apache.parquet.io.DelegatingSeekableInputStream#readFully(java.io.InputStream, byte[], int, int)

As a result reading parquet file may result in underflow TrackingFsDataInputStream#read(byte[], int, int) because parquet reader tries to read the whole Row Group (large) and may execute read() multiple times. Underflow leads to unlimited batch size that may lead to OOM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment