Skip to content

Instantly share code, notes, and snippets.

@detro
Last active December 1, 2017 18:54
Show Gist options
  • Save detro/1b52ef899632634bd7ac7ff4c0eb0119 to your computer and use it in GitHub Desktop.
Save detro/1b52ef899632634bd7ac7ff4c0eb0119 to your computer and use it in GitHub Desktop.
XZCompressorInputStreamAdapter: shows how to overcome a limit in https://commons.apache.org/proper/commons-compress/index.html, that doesn't provide a "compressing" InputStream (the same can be done to create a "decompressing" OutputStream). It avoid the use of any thread and it uses just a single buffer to hold on to data in transition.
import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream;
import javax.validation.constraints.NotNull;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* Wraps a generic InputStream into a XZ Compressed InputStream.
*
* It handles internally the necessary use of XZCompressorOutputStream, and it does it within the main thread,
* without any support thread. The only memory it allocates to handle the conversion is a ByteArrayOutputStream.
*
* It's all self contained, so a call to ".close()" will be enough to take care of all the internal cleanup.
*/
public class XZCompressorInputStreamAdapter extends InputStream {
private static final int DEFAULT_COMPRESSION_PRESET = 2;
private final InputStream source;
private final int sourceSize;
private int positionInSource;
private final ExposedByteArrayOutputStream compressedBytesStream;
private final XZCompressorOutputStream compressorOutputStream;
private boolean compressorFinished = false;
// NOTE: The compressor buffers internally, so sometimes the compressed byte array it produces might be
// larger than the destination array passed at the "read()" call. We keep track of this and, in case there are
// compressed data not read yet, we pass those back without furthering the compression. Once it's all done,
// we resume the normal compression process.
private int preCompressedBytesOffset = -1;
private int preCompressedBytesLength = -1;
public XZCompressorInputStreamAdapter(InputStream source) throws IOException {
this(source, DEFAULT_COMPRESSION_PRESET);
}
public XZCompressorInputStreamAdapter(InputStream source, int xzCompressionPreset) throws IOException {
this.source = source;
this.positionInSource = 0;
this.sourceSize = source.available();
this.compressedBytesStream = new ExposedByteArrayOutputStream(8 * 1_024); //< 8K: an ideal buffer size used in lots of places (this class auto-adjusts if needed anyway)
this.compressorOutputStream = new XZCompressorOutputStream(compressedBytesStream, xzCompressionPreset);
}
@Override
public int read() throws IOException {
if (calculateAvailable() > 0) {
byte[] singleByte = new byte[1];
if (read(singleByte, 0, 1) > 0) {
return singleByte[0];
}
}
return -1;
}
@Override
public synchronized int read(@NotNull final byte destination[], final int offset, int length) throws IOException {
// Validate input
if (destination == null) {
throw new NullPointerException();
} else if (offset < 0 || length < 0 || length > destination.length - offset) {
throw new IndexOutOfBoundsException();
}
// If there were compressed data that we couldn't read yet, read them now until it's all been consumed
if (preCompressedBytesOffset >= 0) {
// Determine how much can be read into destination of the pre-compressed data, and read it
int howMuchCanBeRead = Math.min(preCompressedBytesLength - preCompressedBytesOffset, destination.length);
System.arraycopy(compressedBytesStream.byteArray(), preCompressedBytesOffset, destination, offset, howMuchCanBeRead);
// Update the pre-compressed data offset
preCompressedBytesOffset += howMuchCanBeRead;
// If we are done reading the pre-compressed data, reset offset/length so we can resume compression in the next call
if (preCompressedBytesOffset >= preCompressedBytesLength) {
preCompressedBytesOffset = -1;
preCompressedBytesLength = -1;
}
return howMuchCanBeRead;
}
try {
do {
// First, write into the buffer uncompressed
// NOTE: the destination buffer is used as a "support" buffer while we pass data into the compressor.
// At the end we will replace it's content with the final compressed data
final int amountRead = source.read(destination, offset, length);
boolean endOfInputReached = -1 == amountRead || 0 == amountRead;
if (endOfInputReached && compressorFinished && compressedBytesStream.size() == 0) {
// If the EOI was reached, the compressor has already finished and the there was nothing else compressed, it's time to quit it
return -1;
}
// If any amount of bytes was read, we need to compress it
if (amountRead > 0) {
// Update the position along the source
positionInSource += amountRead;
// Write what we just read into the Compressor
compressorOutputStream.write(destination, offset, amountRead);
}
// If we are done reading the Stream, we need to finish it (i.e. let it flush and append the compression footer)
if (endOfInputReached) {
compressorOutputStream.finish();
compressorFinished = true;
}
} while (compressedBytesStream.size() == 0); //< Keep reading until some compressed data were flushed
// Copy the compressed data into the destination
if (compressedBytesStream.length() > destination.length) {
// Copy compressed bytes to destination, but don't overflow it
System.arraycopy(compressedBytesStream.byteArray(), 0, destination, offset, destination.length);
// Remember up to where we read the compressed bytes so we can finish returning those in the next call
preCompressedBytesOffset = destination.length;
preCompressedBytesLength = compressedBytesStream.length();
return destination.length;
} else {
// Copy ALL compressed bytes on the destination
System.arraycopy(compressedBytesStream.byteArray(), 0, destination, offset, compressedBytesStream.length());
// Zero-out the bytes not used by the final compressed data
for (int i = offset + compressedBytesStream.length(); i < offset + length; ++i) {
destination[i] = 0x0;
}
return compressedBytesStream.length();
}
} finally {
compressedBytesStream.reset();
}
}
@Override
public int available() throws IOException {
return calculateAvailable();
}
@Override
public void close() throws IOException {
source.close();
compressorOutputStream.close();
}
private int calculateAvailable() {
return sourceSize - positionInSource;
}
/**
* Allows access to the buffer in ByteArrayOutputStream without unnecessary copying
*/
private static final class ExposedByteArrayOutputStream extends ByteArrayOutputStream {
ExposedByteArrayOutputStream(int expectedInputSize) {
super(expectedInputSize);
}
byte[] byteArray() {
return buf;
}
int length() {
return count;
}
}
}
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import static org.junit.Assert.*;
public class XZCompressorInputStreamAdapterTest {
private static final Logger LOG = Loggers.build();
@Test
public void bla() throws IOException {
final Path temp = Files.createTempFile("uncompressed", null);
final Path tempCompressed = Files.createTempFile("compressed", null);
final Path copy = Files.createTempFile("uncompressed-copy", null);
try {
final InputStream tempIn = Files.newInputStream(temp);
Files.write(temp, RandomStringUtils.randomAlphanumeric(100 * 1024 * 1024).getBytes(), StandardOpenOption.TRUNCATE_EXISTING);
LOG.info("Created uncompressed file of {} bytes: {}", Files.size(temp), temp);
final XZCompressorInputStreamAdapter compressedTempIn = new XZCompressorInputStreamAdapter(tempIn);
Files.copy(compressedTempIn, tempCompressed, StandardCopyOption.REPLACE_EXISTING);
LOG.info("Created compressed file of {} bytes: {}", Files.size(tempCompressed), tempCompressed);
compressedTempIn.close();
final XZCompressorInputStream compressedIn = new XZCompressorInputStream(new BufferedInputStream(Files.newInputStream(tempCompressed)));
final OutputStream out = Files.newOutputStream(copy);
final byte[] buffer = new byte[1024 * 8];
int n;
while (-1 != (n = compressedIn.read(buffer))) {
out.write(buffer, 0, n);
}
out.close();
compressedIn.close();
LOG.info("Created unmpressed file of {} bytes: {}", Files.size(copy), copy);
assertArrayEquals(Files.readAllBytes(temp), Files.readAllBytes(copy));
} finally {
Files.deleteIfExists(temp);
Files.deleteIfExists(tempCompressed);
Files.deleteIfExists(copy);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment