Created
April 4, 2016 19:03
-
-
Save jkff/d8d984a33a41ec607328cee8e418c174 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java | |
index 4e3e9ca..15e6e29 100644 | |
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java | |
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java | |
@@ -122,7 +122,7 @@ public class CompressedSource<T> extends FileBasedSource<T> { | |
byte zero = 0x00; | |
int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]); | |
if (header == GZIPInputStream.GZIP_MAGIC) { | |
- return Channels.newChannel(new GzipCompressorInputStream(stream)); | |
+ return Channels.newChannel(new GzipCompressorInputStream(stream, true)); | |
} | |
} | |
return Channels.newChannel(stream); | |
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java | |
index 2dcddb4..a7dfc58 100644 | |
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java | |
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java | |
@@ -47,16 +47,19 @@ import org.junit.rules.TemporaryFolder; | |
import org.junit.runner.RunWith; | |
import org.junit.runners.JUnit4; | |
+import java.io.ByteArrayOutputStream; | |
import java.io.File; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.io.OutputStream; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.ReadableByteChannel; | |
+import java.nio.charset.StandardCharsets; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.NoSuchElementException; | |
import java.util.Random; | |
+import java.util.zip.GZIPOutputStream; | |
import javax.annotation.Nullable; | |
@@ -80,6 +83,43 @@ public class CompressedSourceTest { | |
runReadTest(input, CompressionMode.GZIP); | |
} | |
+ private static byte[] compressGzip(byte[] input) throws IOException { | |
+ ByteArrayOutputStream res = new ByteArrayOutputStream(); | |
+ try(GZIPOutputStream gzipStream = new GZIPOutputStream(res)) { | |
+ gzipStream.write(input); | |
+ } | |
+ return res.toByteArray(); | |
+ } | |
+ | |
+ private static byte[] concat(byte[] first, byte[] second) { | |
+ byte[] res = new byte[first.length + second.length]; | |
+ System.arraycopy(first, 0, res, 0, first.length); | |
+ System.arraycopy(second, 0, res, first.length, second.length); | |
+ return res; | |
+ } | |
+ | |
+ | |
+ @Test | |
+ public void testReadConcatenatedGzip() throws IOException { | |
+ byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8); | |
+ byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8); | |
+ byte[] expected = concat(header, body); | |
+ byte[] totalGz = concat(compressGzip(header), compressGzip(body)); | |
+ File tmpFile = tmpFolder.newFile(); | |
+ try (FileOutputStream os = new FileOutputStream(tmpFile)) { | |
+ os.write(totalGz); | |
+ } | |
+ | |
+ Pipeline p = TestPipeline.create(); | |
+ | |
+ CompressedSource<Byte> source = | |
+ CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1)) | |
+ .withDecompression(CompressionMode.GZIP); | |
+ PCollection<Byte> output = p.apply(Read.from(source)); | |
+ | |
+ DataflowAssert.that(output).containsInAnyOrder(Bytes.asList(expected)); | |
+ p.run(); | |
+ } | |
+ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment