Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A fix for s3 issues on StreamingFileSInk
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
index 8f3aff899f..c7a5e03f96 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.function.FunctionWithException;
+import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -40,6 +41,8 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream {
public static final int BUFFER_SIZE = 4096;
+ public static final int BUFFER_READ_SIZE = 10485760;
+
private final RefCountedFile currentTmpFile;
/** The write buffer. */
@@ -65,7 +68,9 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream {
@Override
public InputStream getInputStream() throws IOException {
- return Files.newInputStream(currentTmpFile.getFile().toPath(), StandardOpenOption.READ);
+ // wrap an InputStream with a BufferedInputStream to allow resets
+ // TODO: we should use files instead
+ return new BufferedInputStream(Files.newInputStream(currentTmpFile.getFile().toPath(), StandardOpenOption.READ), BUFFER_READ_SIZE);
}
@Override
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.