Created
March 31, 2020 00:48
-
-
Save anuchandy/ab602157c36f493ae74f9b1a0da76ed4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.azure.core.http.okhttp; | |
import com.azure.core.http.HttpClient; | |
import com.azure.core.http.HttpHeader; | |
import com.azure.core.http.HttpHeaders; | |
import com.azure.core.http.HttpMethod; | |
import com.azure.core.http.HttpRequest; | |
import com.azure.core.http.HttpResponse; | |
import com.azure.core.util.logging.ClientLogger; | |
import okhttp3.Call; | |
import okhttp3.Headers; | |
import okhttp3.MediaType; | |
import okhttp3.OkHttpClient; | |
import okhttp3.Request; | |
import okhttp3.RequestBody; | |
import okhttp3.Response; | |
import okhttp3.ResponseBody; | |
import okio.*; | |
import org.reactivestreams.Publisher; | |
import reactor.core.Disposable; | |
import reactor.core.Exceptions; | |
import reactor.core.publisher.*; | |
import reactor.util.context.Context; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.nio.ByteBuffer; | |
import java.nio.charset.Charset; | |
import java.time.Duration; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Objects; | |
import java.util.Optional; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TimeoutException; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.function.BiFunction; | |
import java.util.function.Consumer; | |
import java.util.function.Function; | |
/** | |
* HttpClient implementation for OkHttp. | |
*/ | |
class OkHttpAsyncHttpClient implements HttpClient { | |
private static final ClientLogger logger = new ClientLogger(OkHttpAsyncHttpClient.class); | |
private final OkHttpClient httpClient; | |
// | |
private static final Mono<okio.ByteString> EMPTY_BYTE_STRING_MONO = Mono.just(okio.ByteString.EMPTY); | |
OkHttpAsyncHttpClient(OkHttpClient httpClient) { | |
this.httpClient = httpClient; | |
} | |
@Override | |
public Mono<HttpResponse> send(HttpRequest request) { | |
return Mono.create(sink -> sink.onRequest(value -> { | |
// Using MonoSink::onRequest for back pressure support. | |
// The blocking behavior toOkHttpRequest(r).subscribe call: | |
// | |
// The okhttp3.Request emitted by toOkHttpRequest(r) is chained from the body of request Flux<ByteBuffer>: | |
// 1. If Flux<ByteBuffer> synchronous and send(r) caller does not apply subscribeOn then | |
// subscribe block on caller thread. | |
// 2. If Flux<ByteBuffer> synchronous and send(r) caller apply subscribeOn then | |
// does not block caller thread but block on scheduler thread. | |
// 3. If Flux<ByteBuffer> asynchronous then subscribe does not block caller thread | |
// but block on the thread backing flux. This ignore any subscribeOn applied to send(r) | |
// | |
toOkHttpRequestCollect(request).subscribe(okHttpRequest -> { | |
Call call = httpClient.newCall(okHttpRequest); | |
call.enqueue(new OkHttpCallback(call, sink, request)); | |
sink.onCancel(call::cancel); | |
}, sink::error); | |
})); | |
} | |
/** | |
* Converts the given azure-core request to okhttp request. | |
* | |
* @param request the azure-core request | |
* @return the Mono emitting okhttp request | |
*/ | |
private static Mono<okhttp3.Request> toOkHttpRequest(HttpRequest request) { | |
return Mono.just(new okhttp3.Request.Builder()) | |
.map(rb -> { | |
rb.url(request.getUrl()); | |
if (request.getHeaders() != null) { | |
Map<String, String> headers = new HashMap<>(); | |
for (HttpHeader hdr : request.getHeaders()) { | |
if (hdr.getValue() != null) { | |
headers.put(hdr.getName(), hdr.getValue()); | |
} | |
} | |
return rb.headers(okhttp3.Headers.of(headers)); | |
} else { | |
return rb.headers(okhttp3.Headers.of(new HashMap<>())); | |
} | |
}) | |
.flatMap((Function<Request.Builder, Mono<Request.Builder>>) rb -> { | |
if (request.getHttpMethod() == HttpMethod.GET) { | |
return Mono.just(rb.get()); | |
} else if (request.getHttpMethod() == HttpMethod.HEAD) { | |
return Mono.just(rb.head()); | |
} else { | |
return toOkHttpRequestBody2(request.getBody(), request.getHeaders()) | |
.map(requestBody -> rb.method(request.getHttpMethod().toString(), requestBody)); | |
} | |
}) | |
.map(Request.Builder::build); | |
} | |
/** | |
* Create a Mono of okhttp3.RequestBody from the given java.nio.ByteBuffer Flux. | |
* | |
* @param bbFlux stream of java.nio.ByteBuffer representing request content | |
* @param headers the headers associated with the original request | |
* @return the Mono emitting okhttp3.RequestBody | |
*/ | |
private static Mono<RequestBody> toOkHttpRequestBodyByteString(Flux<ByteBuffer> bbFlux, HttpHeaders headers) { | |
Mono<okio.ByteString> bsMono = bbFlux == null | |
? EMPTY_BYTE_STRING_MONO | |
: toByteString(bbFlux); | |
return bsMono.map(bs -> { | |
String contentType = headers.getValue("Content-Type"); | |
if (contentType == null) { | |
return RequestBody.create(bs, null); | |
} else { | |
return RequestBody.create(bs, MediaType.parse(contentType)); | |
} | |
}); | |
} | |
private static Mono<RequestBody> toOkHttpRequestBodyCollect(Flux<ByteBuffer> bbFlux, HttpHeaders headers) { | |
if (bbFlux == null) { | |
String contentType = headers.getValue("Content-Type"); | |
if (contentType == null) { | |
return Mono.just(RequestBody.create(ByteString.EMPTY, null)); | |
} else { | |
return Mono.just(RequestBody.create(ByteString.EMPTY, MediaType.parse(contentType))); | |
} | |
} | |
return collectBytesInByteBufferStream(bbFlux) | |
.map(new Function<byte[], RequestBody>() { | |
@Override | |
public RequestBody apply(byte[] bytes) { | |
String contentType = headers.getValue("Content-Type"); | |
System.out.println("Uploading size:" + bytes.length); | |
if (contentType == null) { | |
return RequestBody.create(bytes, null); | |
} else { | |
return RequestBody.create(bytes, MediaType.parse(contentType)); | |
} | |
} | |
}); | |
} | |
private static Mono<byte[]> collectBytesInByteBufferStream(Flux<ByteBuffer> stream) { | |
return stream | |
.collect(ByteArrayOutputStream::new, OkHttpAsyncHttpClient::accept) | |
.map(ByteArrayOutputStream::toByteArray); | |
} | |
private static void accept(ByteArrayOutputStream byteOutputStream, ByteBuffer byteBuffer) { | |
try { | |
byte[] arr = byteBufferToArray(byteBuffer); | |
// System.out.println("writing -> " + arr.length); | |
byteOutputStream.write(arr); | |
// System.out.println("written -> " + byteOutputStream.size()); | |
} catch (IOException e) { | |
throw new RuntimeException("Error occurred writing ByteBuffer to ByteArrayOutputStream.", e); | |
} | |
} | |
private static byte[] byteBufferToArray(ByteBuffer byteBuffer) { | |
int length = byteBuffer.remaining(); | |
byte[] byteArray = new byte[length]; | |
byteBuffer.get(byteArray); | |
return byteArray; | |
} | |
/** | |
* Aggregate Flux of java.nio.ByteBuffer to single okio.ByteString. | |
* | |
* Pooled okio.Buffer type is used to buffer emitted ByteBuffer instances. | |
* Content of each ByteBuffer will be written (i.e copied) to the internal okio.Buffer slots. | |
* Once the stream terminates, the contents of all slots get copied to one single byte array | |
* and okio.ByteString will be created referring this byte array. | |
* Finally the initial okio.Buffer will be returned to the pool. | |
* | |
* @param bbFlux the Flux of ByteBuffer to aggregate | |
* @return a mono emitting aggregated ByteString | |
*/ | |
private static Mono<ByteString> toByteString(Flux<ByteBuffer> bbFlux) { | |
Objects.requireNonNull(bbFlux, "'bbFlux' cannot be null."); | |
int [] size = new int[1]; | |
size[0] = 0; | |
return Mono.using(new Callable<Buffer>() { | |
public Buffer call() { | |
System.out.println("creating okio Buffer"); | |
return new okio.Buffer(); | |
} | |
}, | |
buffer -> bbFlux.reduce(buffer, (b, byteBuffer) -> { | |
try { | |
System.out.println("Writing to okio Buffer"); | |
size[0] += b.write(byteBuffer); | |
System.out.println("Reducing request stream:" + size[0]); | |
return b; | |
} catch (IOException ioe) { | |
throw Exceptions.propagate(ioe); | |
} | |
}) | |
.map(b -> { | |
System.out.println("Creating request::ByteString"); | |
ByteString bs = ByteString.of(b.readByteArray()); | |
System.out.println("Created request::ByteString"); | |
return bs; | |
}), | |
new Consumer<Buffer>() { | |
@Override | |
public void accept(Buffer buffer) { | |
System.out.println("done request, releasing okio buffer."); | |
buffer.clear(); | |
buffer.close(); | |
} | |
}) | |
.switchIfEmpty(EMPTY_BYTE_STRING_MONO); | |
} | |
private static class OkHttpCallback implements okhttp3.Callback { | |
private final Call call; | |
private final MonoSink<HttpResponse> sink; | |
private final HttpRequest request; | |
OkHttpCallback(Call call, MonoSink<HttpResponse> sink, HttpRequest request) { | |
this.call = call; | |
this.sink = sink; | |
this.request = request; | |
} | |
@Override | |
public void onFailure(okhttp3.Call call, IOException e) { | |
sink.error(e); | |
} | |
@Override | |
public void onResponse(okhttp3.Call call, okhttp3.Response response) { | |
OkHttpResponse okHttpResponse = new OkHttpResponse(this.call, response, request); | |
sink.onDispose(new Disposable() { | |
@Override | |
public void dispose() { | |
// System.out.println("MonoSink::disposing - Closing response stream."); | |
okHttpResponse.close(); | |
} | |
}); | |
sink.success(okHttpResponse); | |
} | |
} | |
/** | |
* An implementation of {@link HttpResponse} for OkHttp. | |
*/ | |
private static class OkHttpResponse extends HttpResponse { | |
private final okhttp3.Call call; | |
private final int statusCode; | |
private final HttpHeaders headers; | |
private final ResponseBody responseBody; | |
// using 4K as default buffer size: https://stackoverflow.com/a/237495/1473510 | |
private static final int BYTE_BUFFER_CHUNK_SIZE = 4096; | |
OkHttpResponse(okhttp3.Call call, Response innerResponse, HttpRequest request) { | |
super(request); | |
this.call = call; | |
this.statusCode = innerResponse.code(); | |
this.headers = fromOkHttpHeaders(innerResponse.headers()); | |
// innerResponse.body() getter will not return null for server returned responses. | |
// It can be null: | |
// [a]. if response is built manually with null body (e.g for mocking) | |
// [b]. for the cases described here | |
// [ref](https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response/body/). | |
// | |
this.responseBody = innerResponse.body(); | |
} | |
@Override | |
public int getStatusCode() { | |
return this.statusCode; | |
} | |
@Override | |
public String getHeaderValue(String name) { | |
return this.headers.getValue(name); | |
} | |
@Override | |
public HttpHeaders getHeaders() { | |
return this.headers; | |
} | |
private static void logThowableAsWarning(String s, Throwable t) { | |
try { | |
logger.warning("[Ignorable after Call.cancel]: " + t.getClass().getSimpleName() + " " + t.getMessage() + " " + s); | |
} catch (Throwable i) { | |
System.out.println("Failed to LOG:" + i.getMessage()); | |
} | |
} | |
@Override | |
public Flux<ByteBuffer> getBody() { | |
return getBodyUsingWhenVersion(); | |
} | |
private Flux<ByteBuffer> getBodyUsingWhenVersion() { | |
if (this.responseBody == null) { | |
return Flux.empty(); | |
} | |
return Flux.deferWithContext(new Function<Context, Flux<ByteBuffer>>() { | |
@Override | |
public Flux<ByteBuffer> apply(Context reactorContext) { | |
Optional<Long> marker = reactorContext.getOrEmpty("marker"); | |
Long id = marker.isPresent() ? marker.get() : -1L; | |
InputStream inputStream = responseBody.byteStream(); | |
return Flux.usingWhen(Mono.just(inputStream), | |
new Function<InputStream, Flux<ByteBuffer>>() { | |
@Override | |
public Flux<ByteBuffer> apply(InputStream inputStream) { | |
Pair pair = new Pair(); | |
return Flux.just(true) | |
.repeat() | |
.map(ignore -> { | |
byte[] buffer = new byte[BYTE_BUFFER_CHUNK_SIZE]; | |
try { | |
int numBytes; | |
numBytes = inputStream.read(buffer); | |
if (numBytes > 0) { | |
return pair.buffer(ByteBuffer.wrap(buffer, 0, numBytes)).readBytes(numBytes); | |
} else { | |
return pair.buffer(null).readBytes(numBytes); | |
} | |
} catch (Throwable ioe) { | |
System.out.println("read_" | |
+ ioe.getClass().getName() | |
+ " isCancelled: " + call.isCanceled() | |
+ " (" + Thread.currentThread().getName() + ") id:" | |
+ id); | |
// InputStream::read throw-ed. | |
// | |
if (call.isCanceled()) { | |
// | |
// if the 'okhttp3.Call' is in cancelled state then it means 'cancel' executed as | |
// a result of downstream cancelling the Subscription. The cancelled subscription is | |
// the Handle to this thread which read response content (aka response-read-thread). | |
// This cancellation from 'doFinally' will happen from a different thread than | |
// response-read-thread. | |
// | |
// In such a Cancellation case, an interrupt signal sent to this response-read-thread | |
// then detach the downstream from the response-read-thread backed upstream, at this | |
// point Reactor is designed to ignore any data or propagate error from the abandoned | |
// response-read-thread. | |
// | |
// While Reactor adhere to this contract, we noticed that in some edge cases, reactor | |
// still deliver this IOException even after Reactor notifies via 'cancel' | |
// that the response-read-thread backing upstream is cancelled. | |
// | |
// To handle this edge case, we log and explicitly ignore the Exception here that | |
// Reactor is suppose to discard. | |
logThowableAsWarning("content::cancel_read id:" + id, ioe); | |
// | |
// Close response stream for OkHttp to release any resource. | |
closeContentStream(inputStream, "content::cancel_read::close id:" + id, call); | |
return pair.buffer(null).readBytes(-1); | |
} else { | |
closeContentStream(inputStream, "content::read::close id:" + id, call); | |
throw Exceptions.propagate(ioe); | |
} | |
} | |
}) | |
.takeUntil(p -> { | |
boolean predicate = p.readBytes() == -1; | |
return predicate; | |
}) | |
.filter(p -> p.readBytes() > 0) | |
.map(Pair::buffer); | |
} | |
}, | |
// onComplete | |
new Function<InputStream, Mono<Void>>() { | |
@Override | |
public Mono<Void> apply(InputStream inputStream) { | |
return Mono.fromCallable(() -> { | |
closeContentStream(inputStream, "cleanup::content_complete::id:" + id, call); | |
return true; | |
}) | |
.timeout(Duration.ofSeconds(60)) | |
.then(); | |
} | |
}, | |
// onError | |
new BiFunction<InputStream, Throwable, Mono<Void>>() { | |
@Override | |
public Mono<Void> apply(InputStream inputStream, Throwable t) { | |
return Mono.fromCallable(() -> { | |
closeContentStream(inputStream, "cleanup::content_error::id:" + id, call); | |
return true; | |
}) | |
.timeout(Duration.ofSeconds(60)) | |
.then(); | |
} | |
}, | |
// onCancelAsyncCleanup | |
new Function<InputStream, Mono<Void>>() { | |
@Override | |
public Mono<Void> apply(InputStream inputStream) { | |
return Mono.fromCallable(() -> { | |
// | |
// Upon cancel from downStream, this 'onCancelAsyncCleanup' can be executed from a different thread | |
// than the response-read-thread. Though the response-read-thread backing upstream is | |
// marked as cancelled and detached from the downstream at this point, Reactor won't wait | |
// for such an abandoned response-read-thread to complete to run this 'onCancelAsyncCleanup'. | |
// This means there is a potential edge case where when 'onCancelAsyncCleanup' is running, | |
// the response-read-thread could be executing InputStream::read(). | |
// | |
// If InputStream::read() is being executed, we can't do resource cleanup by | |
// calling InputStream::close(). OkHttp InputStream is not thread-safe and has checks | |
// to throw 'Unbalanced enter/exit' if an there is an overlap in invocation of it's | |
// InputStream methods. | |
// | |
// In this case we use thread-safe Call object and unblock read() by cancelling call. | |
call.cancel(); | |
closeContentStream(inputStream, "cleanup::content_cancel::id:" + id, call); | |
return true; | |
}) | |
.timeout(Duration.ofSeconds(60)) | |
.onErrorMap( t -> { | |
if (t instanceof TimeoutException) { | |
System.out.println("Timeout----> id:" + id); | |
} | |
return t; | |
}) | |
.then(); | |
} | |
}); | |
} | |
}); | |
} | |
private Flux<ByteBuffer> getBodyDoFinallyVersion() { | |
if (this.responseBody == null) { | |
return Flux.empty(); | |
} | |
return Flux.deferWithContext(new Function<Context, Flux<ByteBuffer>>() { | |
@Override | |
public Flux<ByteBuffer> apply(Context reactorContext) { | |
Optional<Long> marker = reactorContext.getOrEmpty("marker"); | |
Long id = marker.isPresent() ? marker.get() : -1L; | |
InputStream inputStream = responseBody.byteStream(); | |
Pair pair = new Pair(); | |
return Flux.just(true) | |
.repeat() | |
.map(ignore -> { | |
byte[] buffer = new byte[BYTE_BUFFER_CHUNK_SIZE]; | |
try { | |
int numBytes; | |
numBytes = inputStream.read(buffer); | |
if (numBytes > 0) { | |
return pair.buffer(ByteBuffer.wrap(buffer, 0, numBytes)).readBytes(numBytes); | |
} else { | |
return pair.buffer(null).readBytes(numBytes); | |
} | |
} catch (Throwable ioe) { | |
System.out.println("read_" | |
+ ioe.getClass().getName() | |
+ " isCancelled: " + call.isCanceled() | |
+ " (" + Thread.currentThread().getName() + ") id:" | |
+ id); | |
// InputStream::read throw-ed. | |
// | |
if (call.isCanceled()) { | |
// | |
// if the 'okhttp3.Call' is in cancelled state then it means 'doFinally' executed as | |
// a result of downstream cancelling the Subscription. The cancelled subscription is | |
// the Handle to this thread which read response content (aka response-read-thread). | |
// This cancellation from 'doFinally' will happen from a different thread than | |
// response-read-thread. | |
// | |
// In such a Cancellation case, an interrupt signal sent to this response-read-thread | |
// then detach the downstream from the response-read-thread backed upstream, at this | |
// point Reactor is designed to ignore any data or propagate error from the abandoned | |
// response-read-thread. | |
// | |
// While Reactor adhere to this contract, we noticed that in some edge cases, reactor | |
// still deliver this IOException even after Reactor notifies via 'doFinally' | |
// that the response-read-thread backing upstream is cancelled. | |
// | |
// To handle this edge case, we log and explicitly ignore the Exception here that | |
// Reactor is suppose to discard. | |
logThowableAsWarning("content::cancel_read id:" + id, ioe); | |
// | |
// Close response stream for OkHttp to release any resource. | |
closeContentStream(inputStream, "content::cancel_read::close id:" + id, call); | |
return pair.buffer(null).readBytes(-1); | |
} else { | |
closeContentStream(inputStream, "content::read::close id:" + id, call); | |
throw Exceptions.propagate(ioe); | |
} | |
} | |
}) | |
.takeUntil(p -> { | |
boolean predicate = p.readBytes() == -1; | |
return predicate; | |
}) | |
.filter(p -> p.readBytes() > 0) | |
.map(Pair::buffer) | |
.doFinally(signalType -> { | |
System.out.println("doFinally " + signalType + "(" + Thread.currentThread().getName() + ") id:" + id); | |
if (signalType == SignalType.CANCEL) { | |
// | |
// Upon cancel from downStream, this 'doFinally' can be executed from a different thread | |
// than the response-read-thread. Though the response-read-thread backing upstream is | |
// marked as cancelled and detached from the downstream at this point, Reactor won't wait | |
// for such an abandoned response-read-thread to complete to run this 'doFinally' block. | |
// This means there is a potential edge case where when 'doFinally' is running, | |
// the response-read-thread could be executing InputStream::read(). | |
// | |
// If InputStream::read() is being executed, we can't do resource cleanup by | |
// calling InputStream::close(). OkHttp InputStream is not thread-safe and has checks | |
// to throw 'Unbalanced enter/exit' if an there is an overlap in invocation of it's | |
// InputStream methods. | |
// | |
// In this case we use thread-safe Call object and unblock read() by cancelling call. | |
call.cancel(); | |
System.out.println( | |
"call.cancel()::done id:" + id | |
); | |
// | |
} | |
// Close response stream for OkHttp to release any resource. | |
closeContentStream(inputStream, "doFinally::close id:" + id, call); | |
}); | |
} | |
}); | |
} | |
private static void closeContentStream(InputStream inputStream, String s, Call call) { | |
synchronized (call) { | |
try { | |
System.out.println(threadId() + " " + s + " Closing"); | |
inputStream.close(); | |
System.out.println(threadId() + " " + s + " Closed"); | |
} catch (Throwable t) { | |
if (call.isCanceled()) { | |
logThowableAsWarning(threadId() + " " + s, t); | |
} else { | |
System.out.println("closeContentStream error:" + t.getMessage()); | |
throw logger.logExceptionAsError(Exceptions.propagate(t)); | |
} | |
} | |
} | |
} | |
private static String threadId() { | |
return "(" + Thread.currentThread().getName() + "[" + Thread.currentThread().getId() + "]" + ")"; | |
} | |
@Override | |
public Mono<byte[]> getBodyAsByteArray() { | |
return Mono.fromCallable(() -> { | |
// Reactor: The fromCallable operator treats a null from the Callable | |
// as completion signal. | |
if (responseBody == null) { | |
return null; | |
} | |
byte[] content = responseBody.bytes(); | |
// Consistent with GAed behaviour. | |
if (content.length == 0) { | |
return null; | |
} | |
// OkHttp: When calling ResponseBody::bytes() the underlying stream automatically closed. | |
// https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response-body/#the-response-body-must-be-closed | |
return content; | |
}); | |
} | |
@Override | |
public Mono<String> getBodyAsString() { | |
return Mono.fromCallable(() -> { | |
// Reactor: The fromCallable operator treats a null from the Callable | |
// as completion signal. | |
if (responseBody == null) { | |
return null; | |
} | |
String content = responseBody.string(); | |
// Consistent with GAed behaviour. | |
if (content.length() == 0) { | |
return null; | |
} | |
// OkHttp: When calling ResponseBody::string() the underlying stream automatically closed. | |
// https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response-body/#the-response-body-must-be-closed | |
return content; | |
}); | |
} | |
@Override | |
public Mono<String> getBodyAsString(Charset charset) { | |
return getBodyAsByteArray() | |
.map(bytes -> new String(bytes, charset)); | |
} | |
@Override | |
public void close() { | |
if (this.responseBody != null) { | |
try { | |
// System.out.println("responsebodyClose start " + threadId()); | |
// It's safe to invoke close() multiple times, additional calls will be ignored. | |
this.responseBody.close(); | |
// System.out.println("responsebodyClose end::success " + threadId()); | |
} catch (Throwable t) { | |
System.out.println("responsebodyClose end::failed " + t.getMessage() + threadId()); | |
} | |
} | |
} | |
/** | |
* Creates azure-core HttpHeaders from okhttp headers. | |
* | |
* @param headers okhttp headers | |
* @return azure-core HttpHeaders | |
*/ | |
private static HttpHeaders fromOkHttpHeaders(Headers headers) { | |
HttpHeaders httpHeaders = new HttpHeaders(); | |
for (String headerName : headers.names()) { | |
httpHeaders.put(headerName, headers.get(headerName)); | |
} | |
return httpHeaders; | |
} | |
private Flux<ByteBuffer> getBodyUsingErrorProneVersion() { | |
if (this.responseBody == null) { | |
return Flux.empty(); | |
} | |
return Flux.deferWithContext(new Function<Context, Flux<ByteBuffer>>() { | |
@Override | |
public Flux<ByteBuffer> apply(Context reactorContext) { | |
Optional<Long> marker = reactorContext.getOrEmpty("marker"); | |
Long id = marker.isPresent() ? marker.get() : -1L; | |
AtomicBoolean isStreamClosed = new AtomicBoolean(false); | |
// Use Flux.using to close the stream after complete emission | |
return Flux.using(() -> responseBody.byteStream(), | |
bodyStream -> toFluxByteBuffer(bodyStream, id), | |
bodyStream -> { | |
// System.out.println("callCancel start " + threadId() + " id:" + id); | |
try { | |
// OkHttp: The stream from ResponseBody::byteStream() has to be explicitly closed. | |
// https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response-body/#the-response-body-must-be-closed | |
// bodyStream.close(); | |
call.cancel(); | |
// System.out.println("callCancel end::success " + threadId() + " id:" + id); | |
} catch (Throwable t) { | |
// System.out.println("callCancel end::failed " + t.getMessage() + threadId() + " id:" + id); | |
throw Exceptions.propagate(t); | |
} | |
}, false) | |
.doFinally(signal -> { | |
// System.out.println("getBody()::doFinally(" + signal + ") " + threadId() + " id:" + id); | |
}) | |
.doOnSubscribe(subscription -> { | |
// System.out.println("DownloadContent.subscribe():" + id); | |
}); | |
} | |
}); | |
} | |
/** | |
* Creates a Flux of ByteBuffer, with each ByteBuffer wrapping bytes read from the given | |
* InputStream. | |
* | |
* @param inputStream InputStream to back the Flux | |
* @return Flux of ByteBuffer backed by the InputStream | |
*/ | |
private Flux<ByteBuffer> toFluxByteBuffer(InputStream inputStream, Long id) { | |
Pair pair = new Pair(); | |
return Flux.just(true) | |
.repeat() | |
.map(ignore -> { | |
byte[] buffer = new byte[BYTE_BUFFER_CHUNK_SIZE]; | |
try { | |
if (call.isCanceled()) { | |
inputStream.close(); | |
// System.out.println("Discarding read() after callCancel (" + Thread.currentThread().getName() + ") id:" + id); | |
return pair.buffer(null).readBytes(-1); | |
} else { | |
int numBytes = inputStream.read(buffer); | |
if (numBytes > 0) { | |
return pair.buffer(ByteBuffer.wrap(buffer, 0, numBytes)).readBytes(numBytes); | |
} else { | |
return pair.buffer(null).readBytes(numBytes); | |
} | |
} | |
} catch (IOException ioe) { | |
System.out.println("read_IOException isCallCancelled: " + call.isCanceled() + " (" + Thread.currentThread().getName() + ") id:" + id); | |
if (call.isCanceled()) { | |
try { | |
// System.out.println("read_IOException (" + ioe.getMessage() + ") stream_close::start id: " + id); | |
inputStream.close(); | |
// System.out.println("read_IOException (" + ioe.getMessage() + ") stream_close::end::success id:" + id); | |
} catch (IOException e) { | |
System.out.println("read_IOException stream_close::end::failed " + e.getMessage() + " id:" + id); | |
throw Exceptions.propagate(ioe); | |
} catch (Throwable t) { | |
System.out.println("read_IOException stream_close::end::failed " + t.getMessage()+ " id:" + id); | |
throw Exceptions.propagate(ioe); | |
} | |
// System.out.println("Discarding read_IOException after callCancel:" + " (" + Thread.currentThread().getName() + ") id:" + id); | |
return pair.buffer(null).readBytes(-1); | |
} | |
throw Exceptions.propagate(ioe); | |
} catch (Throwable t) { | |
System.out.println("read_Throwable isCallCancelled: " + call.isCanceled() + " (" + Thread.currentThread().getName() + ") id:" + id); | |
if (call.isCanceled()) { | |
try { | |
// System.out.println("read_Throwable stream_close::start"); | |
inputStream.close(); | |
// System.out.println("read_Throwable stream_close::end::success"); | |
} catch (IOException ioe) { | |
System.out.println("read_IOException stream_close::end::failed " +ioe.getMessage()); | |
throw Exceptions.propagate(ioe); | |
} catch (Throwable t1) { | |
System.out.println("read_Throwable stream_close::end::failed " + t.getMessage()); | |
throw Exceptions.propagate(t1); | |
} | |
// System.out.println("Discarding read_Throwable after callCancel:" + " (" + Thread.currentThread().getName() + ") id:" + id); | |
return pair.buffer(null).readBytes(-1); | |
} | |
throw Exceptions.propagate(t); | |
} | |
}) | |
.takeUntil(p -> { | |
boolean predicate = p.readBytes() == -1; | |
return predicate; | |
}) | |
.filter(p -> p.readBytes() > 0) | |
.map(Pair::buffer); | |
} | |
private static class Pair { | |
private ByteBuffer byteBuffer; | |
private int readBytes; | |
ByteBuffer buffer() { | |
return this.byteBuffer; | |
} | |
int readBytes() { | |
return this.readBytes; | |
} | |
Pair buffer(ByteBuffer byteBuffer) { | |
this.byteBuffer = byteBuffer; | |
return this; | |
} | |
Pair readBytes(int cnt) { | |
this.readBytes = cnt; | |
return this; | |
} | |
} | |
} | |
} | |
// ------------------------------------------------------------ | |
private static final String MARKER_KEY = "marker"; | |
private static final AtomicLong id = new AtomicLong(); | |
private ConcurrentHashMap<Long, Long> map = new ConcurrentHashMap<Long, Long>(); | |
public Mono<Void> runAsync() { | |
return Flux.deferWithContext(new Function<Context, Flux<Integer>>() { | |
@Override | |
public Flux<Integer> apply(Context reactorContext) { | |
Optional<Long> marker = reactorContext.getOrEmpty("marker"); | |
Long id = marker.isPresent() ? marker.get() : -1L; | |
return blobAsyncClient.download() | |
.doOnSubscribe(subscription -> { | |
// System.out.println("download.subscribe():" + id); | |
}) | |
.map(b -> { | |
map.merge(id, (long) b.remaining(), new BiFunction<Long, Long, Long>() { | |
@Override | |
public Long apply(Long oldVal, Long newVal) { | |
long result = oldVal + newVal; | |
if (result <= 4096 * 3 || result >= (options.getSize() - 10000)) { | |
System.out.println("next:" + result + " id:" + id); | |
} | |
return result; | |
} | |
}); | |
for (int i = 0; i < b.remaining(); i++) { | |
b.get(); | |
} | |
return 1; | |
}) | |
.doOnComplete(() -> { | |
// System.out.println("download::map::doOnCompletes:" + id); | |
}); | |
} | |
}) | |
.subscriberContext(new Function<Context, Context>() { | |
@Override | |
public Context apply(Context reactorContext) { | |
Optional<String> OptMarker = reactorContext.getOrEmpty(MARKER_KEY); | |
if (OptMarker.isPresent()) { | |
System.out.println("-----------Unexpected Marker ---------"); | |
} | |
return reactorContext.put(MARKER_KEY, id.incrementAndGet()); | |
} | |
}).then(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment