Skip to content

Instantly share code, notes, and snippets.

@romain-grecourt
Created August 19, 2019 22:49
Show Gist options
  • Save romain-grecourt/e75d9996ee76398456ba4bdd54809a06 to your computer and use it in GitHub Desktop.
Save romain-grecourt/e75d9996ee76398456ba4bdd54809a06 to your computer and use it in GitHub Desktop.
/*
* Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.media.common;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.Flow;
/**
* A {@link Flow.Subscriber subscriber} that can subscribe to a {@link Flow.Publisher publisher} of {@link DataChunk data chunk}
* and make the data available for consumption via standard blocking {@link InputStream} API.
*
* This {@link InputStream} is not thread-safe, concurrent accesses should not be allowed and invocations of read() should be
* synchronized by the consumer for any state updates to be visible cross-threads.
*
* The following assumptions are made about the publisher:
* <ul>
* <li>{@code request} is invoked only after one chunk has been consumed</li>
* <li>The number of chunks requested is always 1</li>
* <li>The source {@link Flow.Publisher} fully conforms to the reactive-streams specification with respect to:
* <ul>
* <li>Total order of {@code onNext}, {@code onComplete}, {@code onError} calls</li>
* <li>Follows back pressure: {@code onNext} is not called until more chunks are requested</li>
* <li>Relaxed ordering of calls to {@code request}, allows to request even after onComplete/onError</li>
* </ul>
* </li>
* </ul>
*/
public class PublisherInputStream extends InputStream implements Flow.Publisher<DataChunk> {
private static final Logger LOGGER = Logger.getLogger(PublisherInputStream.class.getName());
private final Flow.Publisher<DataChunk> originalPublisher;
/*
* Given the assumptions that the number of chunks requested is at most 1, the requests are totally
* ordered with request / onNext by construction. This affords the following safety guarantees:
*
* 1. The only place where this.next is assigned is in onNext, before the next chunk is published
* 2. Initially this.next and this.current are identical; one request(1) is called on subscription
* 3. All subsequent calls to request(1) happen after the publishing of the chunk is observed by read(...)
*
* 4. It follows from 3 and 1 that one and only one assignment to this.next happens before observing the
* chunk by read(...). (provided the Publisher observes backpressure)
*
* 5. Such this.next is never lost, because it is copied into this.current before request(1), therefore
* a new assignment of this.next in onNext never loses the reference to a Future with an unobserved chunk
* (provided the Publisher observes backpressure)
*
* 6. The publishing of the chunk by onNext synchronizes-with the observation of the chunk by a read(...):
* (1) and (5) ensure this.current observed by read(...) is the same as this.next at the time onNext
* is invoked, so onNext completes the same Future as accessed by read(...). Moreover, the store to
* this.next by onNext and load of this.next by read(...) are in happens-before relationship due to this
* synchronizes-with edge, the program order in onNext, and program order in read(...) (and out-of-bands
* synchronization between multiple reads)
*
* A conforming Publisher establishes total order of onNext, therefore, a total order of assignments to
* this.next and Future.complete:
*
* # onSubscribe: assert: this.current == this.next
* - request(1)
*
* # onNext: assert: this.current == this.next
* - prev = next
* - next = new Future (A)
* - prev.complete(chunk) (B): assert: prev == this.current
*
* # read(...)
* - current.get(): (C): (C) synchronizes-with (B): any read is blocked until (B)
* ...
* # read(...) (same or subsequent read)
* - current.get(): synchronizes-with (B)
* - chunk is seen to be consumed entirely: release the chunk, and request next:
* - current = next: (D): (A) happens-before (D), no further onNext intervenes
* invariant: this.current never references a released chunk as seen by close(...),
* assuming read(...) and close(...) are totally ordered - either by
* program order, or through out-of-bands synchronization
* - request(1): assert: a conforming Publisher does not invoke onNext before this
*
* # onNext: assert: this.current == this.next: a conforming Publisher does not invoke onNext before request(1)
* - prev = next
* - next = new Future (E)
* - prev.complete(chunk) (F): assert: prev == this.current
*
* # read(...)
* - current.get(): (G): (G) synchronizes-with (F): any read after (D) is blocked until (F)
* ...
*
* # onComplete / onError: assert: this.next has not been completed: stream is either empty (no onNext will ever
* be called), or an onNext assigned a new uncompleted Future to this.next
* - next.complete(...): (H): assert: conforming Publisher ensures this.next assignments by onNext are visible here
* by totally ordering onNext / onComplete / onError
*
* # read(...): assert: eventually this.current == this.next: either initially, or after some read that consumed
* the chunk in its entirety and requested the new chunk
* - current.get(): (I): (I) synchronizes-with (H)
* - signal EOF
*
* # close(...): assert: this.current never references a released chunk; it either eventually references a chunk
* that has been produced by onNext and has not been consumed fully by read(...), or a null
* produced by onComplete / onError
* assert: if this.next != this.current, this.next will never produce a new chunk: this is the case
* if and only if onNext has occurred, but read(...) has not consumed the chunk in its entirety,
* hence has not requested any new chunks
* - current.whenComplete(release)
*/
private CompletableFuture<DataChunk> current = new CompletableFuture<>();
private CompletableFuture<DataChunk> next = current;
private volatile Flow.Subscription subscription;
private byte[] oneByte;
/**
* Wraps the supplied publisher and adds a blocking {@link InputStream} based nature.
* It is illegal to subscribe to the returned publisher.
*
* @param originalPublisher the original publisher to wrap
*/
public PublisherInputStream(Flow.Publisher<DataChunk> originalPublisher) {
this.originalPublisher = originalPublisher;
}
// This really doesn't need to be AtomicBoolean - all accesses are not thread-safe anyway, so
// are meant to be single-threaded. This remains AtomicBoolean just in case there still is some
// use-case where the existence of the full memory fence on compareAndSet introduces the "out-of-bands
// synchronization" necessary for total ordering of read(...) and close(...)
private final AtomicBoolean subscribed = new AtomicBoolean(false);
@Override
public void close() {
// assert: if current != next, next cannot ever be resolved with a chunk that needs releasing
current.whenComplete(PublisherInputStream::releaseChunk);
current = null; // any future read() will fail
}
@Override
public int read() throws IOException {
if (oneByte == null) {
oneByte = new byte[1];
}
// Chunks are always non-empty, so r is either 1 (at least one byte is produced) or
// negative (EOF)
int r = read(oneByte, 0, 1);
if (r < 0) {
return r;
}
return oneByte[0] & 0xFF;
}
@Override
public int read(byte[] buf, int off, int len) throws IOException {
if (subscribed.compareAndSet(false, true)) {
// subscribe just once
subscribe();
}
if (current == null) {
throw new IOException("Already closed");
}
try {
DataChunk chunk = current.get(); // block until a processing data are available
if (chunk == null) {
return -1;
}
ByteBuffer currentBuffer = chunk.data();
if (currentBuffer.position() == 0) {
LOGGER.finest(() -> "Reading chunk ID: " + chunk.id());
}
int rem = currentBuffer.remaining();
// read as much as possible
if (len > rem) {
len = rem;
}
currentBuffer.get(buf, off, len);
// chunk is consumed entirely, release the chunk and prefetch a new chunk
if (len == rem) {
releaseChunk(chunk, null);
current = next;
subscription.request(1);
}
return len;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
@Override
public void subscribe(Flow.Subscriber<? super DataChunk> subscriber) {
subscriber.onError(new UnsupportedOperationException("Subscribing on this publisher is not allowed!"));
}
private void subscribe() {
originalPublisher.subscribe(new Flow.Subscriber<DataChunk>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
PublisherInputStream.this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(DataChunk item) {
LOGGER.finest(() -> "Processing chunk: " + item.id());
// set next to the next future before completing it
// since completing next will unblock read() which which may set current to next
// if all the data in current has been consumed
CompletableFuture<DataChunk> prev = next;
next = new CompletableFuture<>();
// unblock read()
prev.complete(item);
}
@Override
public void onError(Throwable throwable) {
// unblock read() with an ExecutionException wrapping the throwable
// read() uses a try/catch and wraps the ExecutionException cause in an IOException
next.completeExceptionally(throwable);
}
@Override
public void onComplete() {
// read() returns EOF if the chunk is null
next.complete(null);
}
});
}
private static void releaseChunk(DataChunk chunk, Throwable th) {
if (chunk != null && !chunk.isReleased()) {
LOGGER.finest(() -> "Releasing chunk: " + chunk.id());
chunk.release();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment