Skip to content

Instantly share code, notes, and snippets.

@iconara
Last active November 5, 2020 16:51
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iconara/5de0699b55818ac1d9b51424848b65a6 to your computer and use it in GitHub Desktop.
Save iconara/5de0699b55818ac1d9b51424848b65a6 to your computer and use it in GitHub Desktop.
S3 GetObject InputStreamResponseTransformer using AWS SDK for Java v2
// this is an attempt to create a synchronous InputStream from a call to
// S3AsyncClient#getObject using a blocking queue.
//
// the purpose is to be able to make many S3 operations asynchronously, but
// at the same time be able to pass off some results to threads and into
// code that expects InputStream or Reader, like a Commons CSV.
public class InputStreamResponseTransformer extends InputStream implements AsyncResponseTransformer<GetObjectResponse, InputStream>, Subscriber<ByteBuffer> {
private static final ByteBuffer END_MARKER = ByteBuffer.allocate(0);
private final CompletableFuture<InputStream> future;
private final BlockingQueue<ByteBuffer> chunks;
private Subscription subscription;
private ByteBuffer readChunk;
private Throwable error;
private AtomicBoolean complete;
private AtomicInteger approximateBufferSize;
public InputStreamResponseTransformer() {
this.future = new CompletableFuture<>();
this.chunks = new LinkedBlockingQueue<>();
this.complete = new AtomicBoolean(false);
this.approximateBufferSize = new AtomicInteger(0);
}
@Override
public CompletableFuture<InputStream> prepare() {
return future;
}
@Override
public void onResponse(GetObjectResponse response) {
future.complete(this);
}
@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
publisher.subscribe(this);
}
@Override
public void exceptionOccurred(Throwable t) {
error = t;
future.completeExceptionally(t);
}
@Override
public void onSubscribe(Subscription s) {
subscription = s;
// how much to request initially? what's the unit here?
subscription.request(10L);
}
@Override
public void onNext(ByteBuffer byteBuffer) {
chunks.offer(byteBuffer);
int size = approximateBufferSize.addAndGet(byteBuffer.remaining());
maybeRequestMore(size);
}
private void maybeRequestMore(int currentSize) {
// this is an attempt to keep track of roughly how much is buffered
// and request more only if it's not too much (2^25 =~ 30 MB)
if (currentSize < (1 << 25)) {
// but what's the unit here? is it bytes, or chunks? how large
// should I expect the chunks to be?
subscription.request(1L);
}
}
@Override
public void onError(Throwable t) {
exceptionOccurred(t);
}
@Override
public void onComplete() {
chunks.offer(END_MARKER);
complete.set(true);
}
@Override
public int available() throws IOException {
if (error != null) {
throw new IOException(error);
}
if (readChunk != null) {
return readChunk.remaining();
} else {
return 0;
}
}
private boolean ensureChunk() throws IOException {
if (error != null) {
throw new IOException(error);
}
if (readChunk == END_MARKER) {
return false;
} else if (readChunk == null || !readChunk.hasRemaining()) {
try {
readChunk = chunks.take();
if (readChunk == END_MARKER) {
return false;
} else {
int size = approximateBufferSize.addAndGet(-readChunk.remaining());
maybeRequestMore(size);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return true;
}
@Override
public int read(byte[] destination, int offset, int length) throws IOException {
if (ensureChunk()) {
int actualLength = Math.min(length, readChunk.remaining());
readChunk.get(destination, offset, actualLength);
return actualLength;
} else {
return -1;
}
}
@Override
public int read() throws IOException {
if (ensureChunk()) {
return Byte.toUnsignedInt(readChunk.get());
} else {
return -1;
}
}
@Override
public void close() throws IOException {
if (!complete.get()) {
chunks.clear();
chunks.offer(END_MARKER);
subscription.cancel();
future.cancel(true);
}
super.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment