Skip to content

Instantly share code, notes, and snippets.

@kogupta
Created June 1, 2018 09:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kogupta/575d77abde18fd9a38b373522de438ff to your computer and use it in GitHub Desktop.
Save kogupta/575d77abde18fd9a38b373522de438ff to your computer and use it in GitHub Desktop.
Created this as a template for "async" decompressing - check http://www.cowtowncoder.com/blog/archives/2012/05/entry_475.html for more details
import com.ning.compress.DataHandler;
import com.ning.compress.UncompressorOutputStream;
import com.ning.compress.gzip.GZIPUncompressor;
import com.ning.compress.lzf.LZFUncompressor;
import io.netty.handler.codec.http.HttpHeaders;
import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Optional;
import static org.asynchttpclient.AsyncHandler.State.ABORT;
import static org.asynchttpclient.AsyncHandler.State.CONTINUE;
/**
* reference: http://www.cowtowncoder.com/blog/archives/2012/05/entry_475.html
*
* 1. Created this as a template for "async" decompressing
* 2. modify it to support async file download [again sample in above link]
* 3. use {mmap raf, buffer, /dev/shm file} pool as possible enhancement
* if bounded upstreams [disruptor?] or bounded number of connections?
*/
// dependencies:
// "org.asynchttpclient:async-http-client:2.4.4",
// "com.ning:compress-lzf:1.0.4
public class UncompressingFileHandler implements AsyncHandler<Optional<File>>, DataHandler {
private File file;
private final OutputStream out;
private boolean failed = false;
private UncompressorOutputStream uncompressingStream;
public UncompressingFileHandler(File f) throws IOException {
file = f;
out = new FileOutputStream(f);
}
public State onBodyPartReceived(HttpResponseBodyPart part) throws IOException {
if (!failed) {
// if compressed, pass through uncompressing stream
if (uncompressingStream != null) {
uncompressingStream.write(part.getBodyPartBytes());
} else { // otherwise write directly
out.write(part.getBodyPartBytes());
}
}
return CONTINUE;
}
public Optional<File> onCompleted() throws IOException {
out.close();
if (uncompressingStream != null) { uncompressingStream.close();}
if (failed) {
file.delete();
return Optional.empty();
}
return Optional.ofNullable(file);
}
@Override
public State onHeadersReceived(HttpHeaders h) throws Exception {
// must verify that we are getting compressed stuff here:
String compression = h.get("Content-Encoding");
if (compression != null) {
switch (compression) {
case "lzf":
uncompressingStream = new UncompressorOutputStream(new LZFUncompressor(this));
break;
case "gzip":
uncompressingStream = new UncompressorOutputStream(new GZIPUncompressor(this));
break;
default:
// unsupported compression scheme or, no compression ==> do nothing
break;
}
}
// nothing to check here as of yet
return CONTINUE;
}
@Override
public State onStatusReceived(HttpResponseStatus status) {
failed = (status.getStatusCode() != 200);
return failed ? ABORT : CONTINUE;
}
@Override
public void onThrowable(Throwable t) {
failed = true;
}
@Override
public boolean handleData(byte[] buffer, int offset, int len) throws IOException {
// DataHandler implementation for Uncompressor; called with uncompressed content:
out.write(buffer, offset, len);
return true;
}
@Override
public void allDataHandled() throws IOException {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment