Skip to content

Instantly share code, notes, and snippets.

@anuchandy
Created March 31, 2020 00:48
Show Gist options
  • Save anuchandy/ab602157c36f493ae74f9b1a0da76ed4 to your computer and use it in GitHub Desktop.
Save anuchandy/ab602157c36f493ae74f9b1a0da76ed4 to your computer and use it in GitHub Desktop.
package com.azure.core.http.okhttp;
import com.azure.core.http.HttpClient;
import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpMethod;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.util.logging.ClientLogger;
import okhttp3.Call;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.*;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.*;
import reactor.util.context.Context;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* HttpClient implementation for OkHttp.
*/
class OkHttpAsyncHttpClient implements HttpClient {
private static final ClientLogger logger = new ClientLogger(OkHttpAsyncHttpClient.class);
private final OkHttpClient httpClient;
//
private static final Mono<okio.ByteString> EMPTY_BYTE_STRING_MONO = Mono.just(okio.ByteString.EMPTY);
OkHttpAsyncHttpClient(OkHttpClient httpClient) {
this.httpClient = httpClient;
}
@Override
public Mono<HttpResponse> send(HttpRequest request) {
return Mono.create(sink -> sink.onRequest(value -> {
// Using MonoSink::onRequest for back pressure support.
// The blocking behavior toOkHttpRequest(r).subscribe call:
//
// The okhttp3.Request emitted by toOkHttpRequest(r) is chained from the body of request Flux<ByteBuffer>:
// 1. If Flux<ByteBuffer> synchronous and send(r) caller does not apply subscribeOn then
// subscribe block on caller thread.
// 2. If Flux<ByteBuffer> synchronous and send(r) caller apply subscribeOn then
// does not block caller thread but block on scheduler thread.
// 3. If Flux<ByteBuffer> asynchronous then subscribe does not block caller thread
// but block on the thread backing flux. This ignore any subscribeOn applied to send(r)
//
toOkHttpRequestCollect(request).subscribe(okHttpRequest -> {
Call call = httpClient.newCall(okHttpRequest);
call.enqueue(new OkHttpCallback(call, sink, request));
sink.onCancel(call::cancel);
}, sink::error);
}));
}
/**
* Converts the given azure-core request to okhttp request.
*
* @param request the azure-core request
* @return the Mono emitting okhttp request
*/
private static Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request) {
return Mono.just(new okhttp3.Request.Builder())
.map(rb -> {
rb.url(request.getUrl());
if (request.getHeaders() != null) {
Map<String, String> headers = new HashMap<>();
for (HttpHeader hdr : request.getHeaders()) {
if (hdr.getValue() != null) {
headers.put(hdr.getName(), hdr.getValue());
}
}
return rb.headers(okhttp3.Headers.of(headers));
} else {
return rb.headers(okhttp3.Headers.of(new HashMap<>()));
}
})
.flatMap((Function<Request.Builder, Mono<Request.Builder>>) rb -> {
if (request.getHttpMethod() == HttpMethod.GET) {
return Mono.just(rb.get());
} else if (request.getHttpMethod() == HttpMethod.HEAD) {
return Mono.just(rb.head());
} else {
return toOkHttpRequestBody2(request.getBody(), request.getHeaders())
.map(requestBody -> rb.method(request.getHttpMethod().toString(), requestBody));
}
})
.map(Request.Builder::build);
}
/**
* Create a Mono of okhttp3.RequestBody from the given java.nio.ByteBuffer Flux.
*
* @param bbFlux stream of java.nio.ByteBuffer representing request content
* @param headers the headers associated with the original request
* @return the Mono emitting okhttp3.RequestBody
*/
private static Mono<RequestBody> toOkHttpRequestBodyByteString(Flux<ByteBuffer> bbFlux, HttpHeaders headers) {
Mono<okio.ByteString> bsMono = bbFlux == null
? EMPTY_BYTE_STRING_MONO
: toByteString(bbFlux);
return bsMono.map(bs -> {
String contentType = headers.getValue("Content-Type");
if (contentType == null) {
return RequestBody.create(bs, null);
} else {
return RequestBody.create(bs, MediaType.parse(contentType));
}
});
}
private static Mono<RequestBody> toOkHttpRequestBodyCollect(Flux<ByteBuffer> bbFlux, HttpHeaders headers) {
if (bbFlux == null) {
String contentType = headers.getValue("Content-Type");
if (contentType == null) {
return Mono.just(RequestBody.create(ByteString.EMPTY, null));
} else {
return Mono.just(RequestBody.create(ByteString.EMPTY, MediaType.parse(contentType)));
}
}
return collectBytesInByteBufferStream(bbFlux)
.map(new Function<byte[], RequestBody>() {
@Override
public RequestBody apply(byte[] bytes) {
String contentType = headers.getValue("Content-Type");
System.out.println("Uploading size:" + bytes.length);
if (contentType == null) {
return RequestBody.create(bytes, null);
} else {
return RequestBody.create(bytes, MediaType.parse(contentType));
}
}
});
}
private static Mono<byte[]> collectBytesInByteBufferStream(Flux<ByteBuffer> stream) {
return stream
.collect(ByteArrayOutputStream::new, OkHttpAsyncHttpClient::accept)
.map(ByteArrayOutputStream::toByteArray);
}
private static void accept(ByteArrayOutputStream byteOutputStream, ByteBuffer byteBuffer) {
try {
byte[] arr = byteBufferToArray(byteBuffer);
// System.out.println("writing -> " + arr.length);
byteOutputStream.write(arr);
// System.out.println("written -> " + byteOutputStream.size());
} catch (IOException e) {
throw new RuntimeException("Error occurred writing ByteBuffer to ByteArrayOutputStream.", e);
}
}
private static byte[] byteBufferToArray(ByteBuffer byteBuffer) {
int length = byteBuffer.remaining();
byte[] byteArray = new byte[length];
byteBuffer.get(byteArray);
return byteArray;
}
/**
* Aggregate Flux of java.nio.ByteBuffer to single okio.ByteString.
*
* Pooled okio.Buffer type is used to buffer emitted ByteBuffer instances.
* Content of each ByteBuffer will be written (i.e copied) to the internal okio.Buffer slots.
* Once the stream terminates, the contents of all slots get copied to one single byte array
* and okio.ByteString will be created referring this byte array.
* Finally the initial okio.Buffer will be returned to the pool.
*
* @param bbFlux the Flux of ByteBuffer to aggregate
* @return a mono emitting aggregated ByteString
*/
private static Mono<ByteString> toByteString(Flux<ByteBuffer> bbFlux) {
Objects.requireNonNull(bbFlux, "'bbFlux' cannot be null.");
int [] size = new int[1];
size[0] = 0;
return Mono.using(new Callable<Buffer>() {
public Buffer call() {
System.out.println("creating okio Buffer");
return new okio.Buffer();
}
},
buffer -> bbFlux.reduce(buffer, (b, byteBuffer) -> {
try {
System.out.println("Writing to okio Buffer");
size[0] += b.write(byteBuffer);
System.out.println("Reducing request stream:" + size[0]);
return b;
} catch (IOException ioe) {
throw Exceptions.propagate(ioe);
}
})
.map(b -> {
System.out.println("Creating request::ByteString");
ByteString bs = ByteString.of(b.readByteArray());
System.out.println("Created request::ByteString");
return bs;
}),
new Consumer<Buffer>() {
@Override
public void accept(Buffer buffer) {
System.out.println("done request, releasing okio buffer.");
buffer.clear();
buffer.close();
}
})
.switchIfEmpty(EMPTY_BYTE_STRING_MONO);
}
private static class OkHttpCallback implements okhttp3.Callback {
private final Call call;
private final MonoSink<HttpResponse> sink;
private final HttpRequest request;
OkHttpCallback(Call call, MonoSink<HttpResponse> sink, HttpRequest request) {
this.call = call;
this.sink = sink;
this.request = request;
}
@Override
public void onFailure(okhttp3.Call call, IOException e) {
sink.error(e);
}
@Override
public void onResponse(okhttp3.Call call, okhttp3.Response response) {
OkHttpResponse okHttpResponse = new OkHttpResponse(this.call, response, request);
sink.onDispose(new Disposable() {
@Override
public void dispose() {
// System.out.println("MonoSink::disposing - Closing response stream.");
okHttpResponse.close();
}
});
sink.success(okHttpResponse);
}
}
/**
* An implementation of {@link HttpResponse} for OkHttp.
*/
private static class OkHttpResponse extends HttpResponse {
private final okhttp3.Call call;
private final int statusCode;
private final HttpHeaders headers;
private final ResponseBody responseBody;
// using 4K as default buffer size: https://stackoverflow.com/a/237495/1473510
private static final int BYTE_BUFFER_CHUNK_SIZE = 4096;
OkHttpResponse(okhttp3.Call call, Response innerResponse, HttpRequest request) {
super(request);
this.call = call;
this.statusCode = innerResponse.code();
this.headers = fromOkHttpHeaders(innerResponse.headers());
// innerResponse.body() getter will not return null for server returned responses.
// It can be null:
// [a]. if response is built manually with null body (e.g for mocking)
// [b]. for the cases described here
// [ref](https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response/body/).
//
this.responseBody = innerResponse.body();
}
@Override
public int getStatusCode() {
return this.statusCode;
}
@Override
public String getHeaderValue(String name) {
return this.headers.getValue(name);
}
@Override
public HttpHeaders getHeaders() {
return this.headers;
}
private static void logThowableAsWarning(String s, Throwable t) {
try {
logger.warning("[Ignorable after Call.cancel]: " + t.getClass().getSimpleName() + " " + t.getMessage() + " " + s);
} catch (Throwable i) {
System.out.println("Failed to LOG:" + i.getMessage());
}
}
@Override
public Flux<ByteBuffer> getBody() {
return getBodyUsingWhenVersion();
}
private Flux<ByteBuffer> getBodyUsingWhenVersion() {
if (this.responseBody == null) {
return Flux.empty();
}
return Flux.deferWithContext(new Function<Context, Flux<ByteBuffer>>() {
@Override
public Flux<ByteBuffer> apply(Context reactorContext) {
Optional<Long> marker = reactorContext.getOrEmpty("marker");
Long id = marker.isPresent() ? marker.get() : -1L;
InputStream inputStream = responseBody.byteStream();
return Flux.usingWhen(Mono.just(inputStream),
new Function<InputStream, Flux<ByteBuffer>>() {
@Override
public Flux<ByteBuffer> apply(InputStream inputStream) {
Pair pair = new Pair();
return Flux.just(true)
.repeat()
.map(ignore -> {
byte[] buffer = new byte[BYTE_BUFFER_CHUNK_SIZE];
try {
int numBytes;
numBytes = inputStream.read(buffer);
if (numBytes > 0) {
return pair.buffer(ByteBuffer.wrap(buffer, 0, numBytes)).readBytes(numBytes);
} else {
return pair.buffer(null).readBytes(numBytes);
}
} catch (Throwable ioe) {
System.out.println("read_"
+ ioe.getClass().getName()
+ " isCancelled: " + call.isCanceled()
+ " (" + Thread.currentThread().getName() + ") id:"
+ id);
// InputStream::read throw-ed.
//
if (call.isCanceled()) {
//
// if the 'okhttp3.Call' is in cancelled state then it means 'cancel' executed as
// a result of downstream cancelling the Subscription. The cancelled subscription is
// the Handle to this thread which read response content (aka response-read-thread).
// This cancellation from 'doFinally' will happen from a different thread than
// response-read-thread.
//
// In such a Cancellation case, an interrupt signal sent to this response-read-thread
// then detach the downstream from the response-read-thread backed upstream, at this
// point Reactor is designed to ignore any data or propagate error from the abandoned
// response-read-thread.
//
// While Reactor adhere to this contract, we noticed that in some edge cases, reactor
// still deliver this IOException even after Reactor notifies via 'cancel'
// that the response-read-thread backing upstream is cancelled.
//
// To handle this edge case, we log and explicitly ignore the Exception here that
// Reactor is suppose to discard.
logThowableAsWarning("content::cancel_read id:" + id, ioe);
//
// Close response stream for OkHttp to release any resource.
closeContentStream(inputStream, "content::cancel_read::close id:" + id, call);
return pair.buffer(null).readBytes(-1);
} else {
closeContentStream(inputStream, "content::read::close id:" + id, call);
throw Exceptions.propagate(ioe);
}
}
})
.takeUntil(p -> {
boolean predicate = p.readBytes() == -1;
return predicate;
})
.filter(p -> p.readBytes() > 0)
.map(Pair::buffer);
}
},
// onComplete
new Function<InputStream, Mono<Void>>() {
@Override
public Mono<Void> apply(InputStream inputStream) {
return Mono.fromCallable(() -> {
closeContentStream(inputStream, "cleanup::content_complete::id:" + id, call);
return true;
})
.timeout(Duration.ofSeconds(60))
.then();
}
},
// onError
new BiFunction<InputStream, Throwable, Mono<Void>>() {
@Override
public Mono<Void> apply(InputStream inputStream, Throwable t) {
return Mono.fromCallable(() -> {
closeContentStream(inputStream, "cleanup::content_error::id:" + id, call);
return true;
})
.timeout(Duration.ofSeconds(60))
.then();
}
},
// onCancelAsyncCleanup
new Function<InputStream, Mono<Void>>() {
@Override
public Mono<Void> apply(InputStream inputStream) {
return Mono.fromCallable(() -> {
//
// Upon cancel from downStream, this 'onCancelAsyncCleanup' can be executed from a different thread
// than the response-read-thread. Though the response-read-thread backing upstream is
// marked as cancelled and detached from the downstream at this point, Reactor won't wait
// for such an abandoned response-read-thread to complete to run this 'onCancelAsyncCleanup'.
// This means there is a potential edge case where when 'onCancelAsyncCleanup' is running,
// the response-read-thread could be executing InputStream::read().
//
// If InputStream::read() is being executed, we can't do resource cleanup by
// calling InputStream::close(). OkHttp InputStream is not thread-safe and has checks
// to throw 'Unbalanced enter/exit' if an there is an overlap in invocation of it's
// InputStream methods.
//
// In this case we use thread-safe Call object and unblock read() by cancelling call.
call.cancel();
closeContentStream(inputStream, "cleanup::content_cancel::id:" + id, call);
return true;
})
.timeout(Duration.ofSeconds(60))
.onErrorMap( t -> {
if (t instanceof TimeoutException) {
System.out.println("Timeout----> id:" + id);
}
return t;
})
.then();
}
});
}
});
}
private Flux<ByteBuffer> getBodyDoFinallyVersion() {
if (this.responseBody == null) {
return Flux.empty();
}
return Flux.deferWithContext(new Function<Context, Flux<ByteBuffer>>() {
@Override
public Flux<ByteBuffer> apply(Context reactorContext) {
Optional<Long> marker = reactorContext.getOrEmpty("marker");
Long id = marker.isPresent() ? marker.get() : -1L;
InputStream inputStream = responseBody.byteStream();
Pair pair = new Pair();
return Flux.just(true)
.repeat()
.map(ignore -> {
byte[] buffer = new byte[BYTE_BUFFER_CHUNK_SIZE];
try {
int numBytes;
numBytes = inputStream.read(buffer);
if (numBytes > 0) {
return pair.buffer(ByteBuffer.wrap(buffer, 0, numBytes)).readBytes(numBytes);
} else {
return pair.buffer(null).readBytes(numBytes);
}
} catch (Throwable ioe) {
System.out.println("read_"
+ ioe.getClass().getName()
+ " isCancelled: " + call.isCanceled()
+ " (" + Thread.currentThread().getName() + ") id:"
+ id);
// InputStream::read throw-ed.
//
if (call.isCanceled()) {
//
// if the 'okhttp3.Call' is in cancelled state then it means 'doFinally' executed as
// a result of downstream cancelling the Subscription. The cancelled subscription is
// the Handle to this thread which read response content (aka response-read-thread).
// This cancellation from 'doFinally' will happen from a different thread than
// response-read-thread.
//
// In such a Cancellation case, an interrupt signal sent to this response-read-thread
// then detach the downstream from the response-read-thread backed upstream, at this
// point Reactor is designed to ignore any data or propagate error from the abandoned
// response-read-thread.
//
// While Reactor adhere to this contract, we noticed that in some edge cases, reactor
// still deliver this IOException even after Reactor notifies via 'doFinally'
// that the response-read-thread backing upstream is cancelled.
//
// To handle this edge case, we log and explicitly ignore the Exception here that
// Reactor is suppose to discard.
logThowableAsWarning("content::cancel_read id:" + id, ioe);
//
// Close response stream for OkHttp to release any resource.
closeContentStream(inputStream, "content::cancel_read::close id:" + id, call);
return pair.buffer(null).readBytes(-1);
} else {
closeContentStream(inputStream, "content::read::close id:" + id, call);
throw Exceptions.propagate(ioe);
}
}
})
.takeUntil(p -> {
boolean predicate = p.readBytes() == -1;
return predicate;
})
.filter(p -> p.readBytes() > 0)
.map(Pair::buffer)
.doFinally(signalType -> {
System.out.println("doFinally " + signalType + "(" + Thread.currentThread().getName() + ") id:" + id);
if (signalType == SignalType.CANCEL) {
//
// Upon cancel from downStream, this 'doFinally' can be executed from a different thread
// than the response-read-thread. Though the response-read-thread backing upstream is
// marked as cancelled and detached from the downstream at this point, Reactor won't wait
// for such an abandoned response-read-thread to complete to run this 'doFinally' block.
// This means there is a potential edge case where when 'doFinally' is running,
// the response-read-thread could be executing InputStream::read().
//
// If InputStream::read() is being executed, we can't do resource cleanup by
// calling InputStream::close(). OkHttp InputStream is not thread-safe and has checks
// to throw 'Unbalanced enter/exit' if an there is an overlap in invocation of it's
// InputStream methods.
//
// In this case we use thread-safe Call object and unblock read() by cancelling call.
call.cancel();
System.out.println(
"call.cancel()::done id:" + id
);
//
}
// Close response stream for OkHttp to release any resource.
closeContentStream(inputStream, "doFinally::close id:" + id, call);
});
}
});
}
private static void closeContentStream(InputStream inputStream, String s, Call call) {
synchronized (call) {
try {
System.out.println(threadId() + " " + s + " Closing");
inputStream.close();
System.out.println(threadId() + " " + s + " Closed");
} catch (Throwable t) {
if (call.isCanceled()) {
logThowableAsWarning(threadId() + " " + s, t);
} else {
System.out.println("closeContentStream error:" + t.getMessage());
throw logger.logExceptionAsError(Exceptions.propagate(t));
}
}
}
}
private static String threadId() {
return "(" + Thread.currentThread().getName() + "[" + Thread.currentThread().getId() + "]" + ")";
}
@Override
public Mono<byte[]> getBodyAsByteArray() {
return Mono.fromCallable(() -> {
// Reactor: The fromCallable operator treats a null from the Callable
// as completion signal.
if (responseBody == null) {
return null;
}
byte[] content = responseBody.bytes();
// Consistent with GAed behaviour.
if (content.length == 0) {
return null;
}
// OkHttp: When calling ResponseBody::bytes() the underlying stream automatically closed.
// https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response-body/#the-response-body-must-be-closed
return content;
});
}
@Override
public Mono<String> getBodyAsString() {
return Mono.fromCallable(() -> {
// Reactor: The fromCallable operator treats a null from the Callable
// as completion signal.
if (responseBody == null) {
return null;
}
String content = responseBody.string();
// Consistent with GAed behaviour.
if (content.length() == 0) {
return null;
}
// OkHttp: When calling ResponseBody::string() the underlying stream automatically closed.
// https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response-body/#the-response-body-must-be-closed
return content;
});
}
@Override
public Mono<String> getBodyAsString(Charset charset) {
return getBodyAsByteArray()
.map(bytes -> new String(bytes, charset));
}
@Override
public void close() {
if (this.responseBody != null) {
try {
// System.out.println("responsebodyClose start " + threadId());
// It's safe to invoke close() multiple times, additional calls will be ignored.
this.responseBody.close();
// System.out.println("responsebodyClose end::success " + threadId());
} catch (Throwable t) {
System.out.println("responsebodyClose end::failed " + t.getMessage() + threadId());
}
}
}
/**
* Creates azure-core HttpHeaders from okhttp headers.
*
* @param headers okhttp headers
* @return azure-core HttpHeaders
*/
private static HttpHeaders fromOkHttpHeaders(Headers headers) {
HttpHeaders httpHeaders = new HttpHeaders();
for (String headerName : headers.names()) {
httpHeaders.put(headerName, headers.get(headerName));
}
return httpHeaders;
}
private Flux<ByteBuffer> getBodyUsingErrorProneVersion() {
if (this.responseBody == null) {
return Flux.empty();
}
return Flux.deferWithContext(new Function<Context, Flux<ByteBuffer>>() {
@Override
public Flux<ByteBuffer> apply(Context reactorContext) {
Optional<Long> marker = reactorContext.getOrEmpty("marker");
Long id = marker.isPresent() ? marker.get() : -1L;
AtomicBoolean isStreamClosed = new AtomicBoolean(false);
// Use Flux.using to close the stream after complete emission
return Flux.using(() -> responseBody.byteStream(),
bodyStream -> toFluxByteBuffer(bodyStream, id),
bodyStream -> {
// System.out.println("callCancel start " + threadId() + " id:" + id);
try {
// OkHttp: The stream from ResponseBody::byteStream() has to be explicitly closed.
// https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response-body/#the-response-body-must-be-closed
// bodyStream.close();
call.cancel();
// System.out.println("callCancel end::success " + threadId() + " id:" + id);
} catch (Throwable t) {
// System.out.println("callCancel end::failed " + t.getMessage() + threadId() + " id:" + id);
throw Exceptions.propagate(t);
}
}, false)
.doFinally(signal -> {
// System.out.println("getBody()::doFinally(" + signal + ") " + threadId() + " id:" + id);
})
.doOnSubscribe(subscription -> {
// System.out.println("DownloadContent.subscribe():" + id);
});
}
});
}
/**
* Creates a Flux of ByteBuffer, with each ByteBuffer wrapping bytes read from the given
* InputStream.
*
* @param inputStream InputStream to back the Flux
* @return Flux of ByteBuffer backed by the InputStream
*/
private Flux<ByteBuffer> toFluxByteBuffer(InputStream inputStream, Long id) {
Pair pair = new Pair();
return Flux.just(true)
.repeat()
.map(ignore -> {
byte[] buffer = new byte[BYTE_BUFFER_CHUNK_SIZE];
try {
if (call.isCanceled()) {
inputStream.close();
// System.out.println("Discarding read() after callCancel (" + Thread.currentThread().getName() + ") id:" + id);
return pair.buffer(null).readBytes(-1);
} else {
int numBytes = inputStream.read(buffer);
if (numBytes > 0) {
return pair.buffer(ByteBuffer.wrap(buffer, 0, numBytes)).readBytes(numBytes);
} else {
return pair.buffer(null).readBytes(numBytes);
}
}
} catch (IOException ioe) {
System.out.println("read_IOException isCallCancelled: " + call.isCanceled() + " (" + Thread.currentThread().getName() + ") id:" + id);
if (call.isCanceled()) {
try {
// System.out.println("read_IOException (" + ioe.getMessage() + ") stream_close::start id: " + id);
inputStream.close();
// System.out.println("read_IOException (" + ioe.getMessage() + ") stream_close::end::success id:" + id);
} catch (IOException e) {
System.out.println("read_IOException stream_close::end::failed " + e.getMessage() + " id:" + id);
throw Exceptions.propagate(ioe);
} catch (Throwable t) {
System.out.println("read_IOException stream_close::end::failed " + t.getMessage()+ " id:" + id);
throw Exceptions.propagate(ioe);
}
// System.out.println("Discarding read_IOException after callCancel:" + " (" + Thread.currentThread().getName() + ") id:" + id);
return pair.buffer(null).readBytes(-1);
}
throw Exceptions.propagate(ioe);
} catch (Throwable t) {
System.out.println("read_Throwable isCallCancelled: " + call.isCanceled() + " (" + Thread.currentThread().getName() + ") id:" + id);
if (call.isCanceled()) {
try {
// System.out.println("read_Throwable stream_close::start");
inputStream.close();
// System.out.println("read_Throwable stream_close::end::success");
} catch (IOException ioe) {
System.out.println("read_IOException stream_close::end::failed " +ioe.getMessage());
throw Exceptions.propagate(ioe);
} catch (Throwable t1) {
System.out.println("read_Throwable stream_close::end::failed " + t.getMessage());
throw Exceptions.propagate(t1);
}
// System.out.println("Discarding read_Throwable after callCancel:" + " (" + Thread.currentThread().getName() + ") id:" + id);
return pair.buffer(null).readBytes(-1);
}
throw Exceptions.propagate(t);
}
})
.takeUntil(p -> {
boolean predicate = p.readBytes() == -1;
return predicate;
})
.filter(p -> p.readBytes() > 0)
.map(Pair::buffer);
}
private static class Pair {
private ByteBuffer byteBuffer;
private int readBytes;
ByteBuffer buffer() {
return this.byteBuffer;
}
int readBytes() {
return this.readBytes;
}
Pair buffer(ByteBuffer byteBuffer) {
this.byteBuffer = byteBuffer;
return this;
}
Pair readBytes(int cnt) {
this.readBytes = cnt;
return this;
}
}
}
}
// ------------------------------------------------------------
private static final String MARKER_KEY = "marker";
private static final AtomicLong id = new AtomicLong();
private ConcurrentHashMap<Long, Long> map = new ConcurrentHashMap<Long, Long>();
public Mono<Void> runAsync() {
return Flux.deferWithContext(new Function<Context, Flux<Integer>>() {
@Override
public Flux<Integer> apply(Context reactorContext) {
Optional<Long> marker = reactorContext.getOrEmpty("marker");
Long id = marker.isPresent() ? marker.get() : -1L;
return blobAsyncClient.download()
.doOnSubscribe(subscription -> {
// System.out.println("download.subscribe():" + id);
})
.map(b -> {
map.merge(id, (long) b.remaining(), new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long oldVal, Long newVal) {
long result = oldVal + newVal;
if (result <= 4096 * 3 || result >= (options.getSize() - 10000)) {
System.out.println("next:" + result + " id:" + id);
}
return result;
}
});
for (int i = 0; i < b.remaining(); i++) {
b.get();
}
return 1;
})
.doOnComplete(() -> {
// System.out.println("download::map::doOnCompletes:" + id);
});
}
})
.subscriberContext(new Function<Context, Context>() {
@Override
public Context apply(Context reactorContext) {
Optional<String> OptMarker = reactorContext.getOrEmpty(MARKER_KEY);
if (OptMarker.isPresent()) {
System.out.println("-----------Unexpected Marker ---------");
}
return reactorContext.put(MARKER_KEY, id.incrementAndGet());
}
}).then();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment