Skip to content

Instantly share code, notes, and snippets.

@jkff
Created April 4, 2016 19:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jkff/d8d984a33a41ec607328cee8e418c174 to your computer and use it in GitHub Desktop.
Save jkff/d8d984a33a41ec607328cee8e418c174 to your computer and use it in GitHub Desktop.
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
index 4e3e9ca..15e6e29 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java
@@ -122,7 +122,7 @@ public class CompressedSource<T> extends FileBasedSource<T> {
byte zero = 0x00;
int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]);
if (header == GZIPInputStream.GZIP_MAGIC) {
- return Channels.newChannel(new GzipCompressorInputStream(stream));
+ return Channels.newChannel(new GzipCompressorInputStream(stream, true));
}
}
return Channels.newChannel(stream);
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
index 2dcddb4..a7dfc58 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
@@ -47,16 +47,19 @@ import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
+import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
@@ -80,6 +83,43 @@ public class CompressedSourceTest {
runReadTest(input, CompressionMode.GZIP);
}
+ private static byte[] compressGzip(byte[] input) throws IOException {
+ ByteArrayOutputStream res = new ByteArrayOutputStream();
+ try(GZIPOutputStream gzipStream = new GZIPOutputStream(res)) {
+ gzipStream.write(input);
+ }
+ return res.toByteArray();
+ }
+
+ private static byte[] concat(byte[] first, byte[] second) {
+ byte[] res = new byte[first.length + second.length];
+ System.arraycopy(first, 0, res, 0, first.length);
+ System.arraycopy(second, 0, res, first.length, second.length);
+ return res;
+ }
+
+
+ @Test
+ public void testReadConcatenatedGzip() throws IOException {
+ byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8);
+ byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8);
+ byte[] expected = concat(header, body);
+ byte[] totalGz = concat(compressGzip(header), compressGzip(body));
+ File tmpFile = tmpFolder.newFile();
+ try (FileOutputStream os = new FileOutputStream(tmpFile)) {
+ os.write(totalGz);
+ }
+
+ Pipeline p = TestPipeline.create();
+
+ CompressedSource<Byte> source =
+ CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1))
+ .withDecompression(CompressionMode.GZIP);
+ PCollection<Byte> output = p.apply(Read.from(source));
+
+ DataflowAssert.that(output).containsInAnyOrder(Bytes.asList(expected));
+ p.run();
+ }
+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment