Skip to content

Instantly share code, notes, and snippets.

@fmoraesmeli
Created October 30, 2018 11:55
Show Gist options
  • Save fmoraesmeli/b5f01b951725f54e98c46a99b8ebf849 to your computer and use it in GitHub Desktop.
Save fmoraesmeli/b5f01b951725f54e98c46a99b8ebf849 to your computer and use it in GitHub Desktop.
Utility Class to Stream zip bytes using Reactor Core Publisher
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
public final class Zipper {
private static final int SIZE_512K = 512 * 1024;
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
private final ZipOutputStream zos = new ZipOutputStream(baos);
private final Integer outputPacketSizeThreshold;
@Getter
private final Flux<byte[]> zipStream;
public Zipper(final Integer outputPacketSizeThreshold, final Publisher<Entry> fileStream) {
this.outputPacketSizeThreshold = outputPacketSizeThreshold;
final Mono<byte[]> lastBytesMono = Mono.fromSupplier(this::getLastBytes);
this.zipStream = Flux.from(fileStream).handle(this::processEntry).concatWith(lastBytesMono);
}
public Zipper(final Publisher<Entry> fileStream) {
this(SIZE_512K, fileStream);
}
private void processEntry(final Entry entry, final SynchronousSink<byte[]> sink) {
writeEntryBytes(entry);
if(baos.size() >= outputPacketSizeThreshold) {
sink.next(getWrittenBytesAndReset());
}
}
private byte[] getWrittenBytesAndReset() {
final byte[] chunk = baos.toByteArray();
baos.reset();
return chunk;
}
private void writeEntryBytes(final Entry entry) {
final ZipEntry zipEntry = new ZipEntry(entry.name);
uncheckedRun(() -> {
zos.putNextEntry(zipEntry);
zos.write(entry.content);
zos.closeEntry();
});
}
private byte[] getLastBytes() {
uncheckedRun(zos::close);
return baos.toByteArray();
}
private void uncheckedRun(final CheckedIORunnable fn) {
try {
fn.run();
} catch (IOException io) {
throw new ZipIOException(io);
}
}
@FunctionalInterface
private interface CheckedIORunnable {
void run() throws IOException;
}
@Value
public static final class Entry {
private final String name;
private final byte[] content;
}
@EqualsAndHashCode(callSuper = true)
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public static final class ZipIOException extends RuntimeException {
@Getter
private final IOException inner;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment