Created
June 1, 2018 09:45
-
-
Save kogupta/575d77abde18fd9a38b373522de438ff to your computer and use it in GitHub Desktop.
Created this as a template for "async" decompressing - check http://www.cowtowncoder.com/blog/archives/2012/05/entry_475.html for more details
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.ning.compress.DataHandler; | |
import com.ning.compress.UncompressorOutputStream; | |
import com.ning.compress.gzip.GZIPUncompressor; | |
import com.ning.compress.lzf.LZFUncompressor; | |
import io.netty.handler.codec.http.HttpHeaders; | |
import org.asynchttpclient.AsyncHandler; | |
import org.asynchttpclient.HttpResponseBodyPart; | |
import org.asynchttpclient.HttpResponseStatus; | |
import java.io.File; | |
import java.io.FileOutputStream; | |
import java.io.IOException; | |
import java.io.OutputStream; | |
import java.util.Optional; | |
import static org.asynchttpclient.AsyncHandler.State.ABORT; | |
import static org.asynchttpclient.AsyncHandler.State.CONTINUE; | |
/** | |
* reference: http://www.cowtowncoder.com/blog/archives/2012/05/entry_475.html | |
* | |
* 1. Created this as a template for "async" decompressing | |
* 2. modify it to support async file download [again sample in above link] | |
* 3. use {mmap raf, buffer, /dev/shm file} pool as possible enhancement | |
* if bounded upstreams [disruptor?] or bounded number of connections? | |
*/ | |
// dependencies: | |
// "org.asynchttpclient:async-http-client:2.4.4", | |
// "com.ning:compress-lzf:1.0.4 | |
public class UncompressingFileHandler implements AsyncHandler<Optional<File>>, DataHandler { | |
private File file; | |
private final OutputStream out; | |
private boolean failed = false; | |
private UncompressorOutputStream uncompressingStream; | |
public UncompressingFileHandler(File f) throws IOException { | |
file = f; | |
out = new FileOutputStream(f); | |
} | |
public State onBodyPartReceived(HttpResponseBodyPart part) throws IOException { | |
if (!failed) { | |
// if compressed, pass through uncompressing stream | |
if (uncompressingStream != null) { | |
uncompressingStream.write(part.getBodyPartBytes()); | |
} else { // otherwise write directly | |
out.write(part.getBodyPartBytes()); | |
} | |
} | |
return CONTINUE; | |
} | |
public Optional<File> onCompleted() throws IOException { | |
out.close(); | |
if (uncompressingStream != null) { uncompressingStream.close();} | |
if (failed) { | |
file.delete(); | |
return Optional.empty(); | |
} | |
return Optional.ofNullable(file); | |
} | |
@Override | |
public State onHeadersReceived(HttpHeaders h) throws Exception { | |
// must verify that we are getting compressed stuff here: | |
String compression = h.get("Content-Encoding"); | |
if (compression != null) { | |
switch (compression) { | |
case "lzf": | |
uncompressingStream = new UncompressorOutputStream(new LZFUncompressor(this)); | |
break; | |
case "gzip": | |
uncompressingStream = new UncompressorOutputStream(new GZIPUncompressor(this)); | |
break; | |
default: | |
// unsupported compression scheme or, no compression ==> do nothing | |
break; | |
} | |
} | |
// nothing to check here as of yet | |
return CONTINUE; | |
} | |
@Override | |
public State onStatusReceived(HttpResponseStatus status) { | |
failed = (status.getStatusCode() != 200); | |
return failed ? ABORT : CONTINUE; | |
} | |
@Override | |
public void onThrowable(Throwable t) { | |
failed = true; | |
} | |
@Override | |
public boolean handleData(byte[] buffer, int offset, int len) throws IOException { | |
// DataHandler implementation for Uncompressor; called with uncompressed content: | |
out.write(buffer, offset, len); | |
return true; | |
} | |
@Override | |
public void allDataHandled() throws IOException { | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment