Skip to content

Instantly share code, notes, and snippets.

@sauthieg
Created December 22, 2023 11:03
Show Gist options
  • Save sauthieg/7a9730c35da43042e7fbaf4d2a1ed1b0 to your computer and use it in GitHub Desktop.
Save sauthieg/7a9730c35da43042e7fbaf4d2a1ed1b0 to your computer and use it in GitHub Desktop.
package org.forgerock.openig.launcher;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.rxjava3.impl.ReadStreamSubscriber;
public class StreamIssue {
public static void main(String[] args) throws InterruptedException {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws Exception {
for (int i = 0; i < 10; i++) {
CountDownLatch latch = new CountDownLatch(1);
Flowable<String> flowable = Flowable.<String>generate(emitter -> {
emitter.onNext("hello");
emitter.onComplete();
latch.countDown();
}).subscribeOn(Schedulers.io()); // subscription happens on a NON event-loop thread
ReadStream<String> rs = new ReadStreamSubscriber<>(Function.identity(), flowable);
WriteInQueueStream ws = new WriteInQueueStream((ContextInternal) vertx.getOrCreateContext());
ReadStream<String> rs2 = new DelegateReadStream(rs) {
@Override
public ReadStream<String> resume() {
// the RS is "open", accepting items from the flowable
// an initial request() is done, that triggers a subscription
// on a different thread:
// items are placed in the pending queue and consumed immediately on the
// non-Vert.x thread
super.resume();
try {
// We block here, waiting for the flowable to complete
// So that, in the pipe, when
// `result.future().onComplete(...)`
// is executed, the RS has completed, and the completion handler
// is executed right away
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return this;
}
};
rs2.pipeTo(ws, result -> {
if (result.succeeded()) {
System.out.println(ws.list);
// => [RxCachedThreadScheduler-1 hello, vert.x-eventloop-thread-0 end]
} else {
System.out.println("KO " + result.cause());
}
});
}
}
});
}
private static class WriteInQueueStream implements WriteStream<String> {
private final ContextInternal context;
Deque<String> list = new ConcurrentLinkedDeque<>();
public WriteInQueueStream(final ContextInternal context) {
this.context = context;
}
@Override
public WriteStream<String> exceptionHandler(@Nullable final Handler<Throwable> handler) {
return this;
}
@Override
public Future<Void> write(final String data) {
Promise<Void> p = Promise.promise();
write(data, p);
return p.future();
}
@Override
public void write(final String data, final Handler<AsyncResult<Void>> handler) {
String receiver = " | Received on " + Thread.currentThread().getName();
context.execute(() -> {
String executed = " | Executed by " + Thread.currentThread().getName();
list.add(data + receiver + executed);
handler.handle(Future.succeededFuture());
});
/*
String executed = " | Executed by " + Thread.currentThread().getName();
list.add(data + receiver + executed);
handler.handle(Future.succeededFuture());
*/
}
@Override
public Future<Void> end() {
Promise<Void> p = Promise.promise();
end(p);
return p.future();
}
@Override
public void end(final Handler<AsyncResult<Void>> handler) {
write("end", handler);
}
@Override
public WriteStream<String> setWriteQueueMaxSize(final int maxSize) {
return this;
}
@Override
public boolean writeQueueFull() {
return false;
}
@Override
public WriteStream<String> drainHandler(@Nullable final Handler<Void> handler) {
return this;
}
}
private static abstract class DelegateReadStream implements ReadStream<String> {
ReadStream<String> delegate;
public DelegateReadStream(final ReadStream<String> delegate) {
this.delegate = delegate;
}
@Override
public ReadStream<String> exceptionHandler(@Nullable final Handler<Throwable> handler) {
delegate.exceptionHandler(handler);
return this;
}
@Override
@Fluent
public ReadStream<String> handler(@Nullable final Handler<String> handler) {
delegate.handler(handler);
return this;
}
@Override
@Fluent
public ReadStream<String> pause() {
delegate.pause();
return this;
}
@Override
@Fluent
public ReadStream<String> resume() {
delegate.resume();
return this;
}
@Override
@Fluent
public ReadStream<String> fetch(final long amount) {
delegate.fetch(amount);
return this;
}
@Override
@Fluent
public ReadStream<String> endHandler(@Nullable final Handler<Void> endHandler) {
delegate.endHandler(endHandler);
return this;
}
}
}
@sauthieg
Copy link
Author

I use ContextInternal.execute() in WriteInQueueStream.write() to mimic the behaviour of the HttpClientRequest (that comes from Http1xClientConnection.Streamimpl) that enforces write operations on the Netty channel to be on the event loop thread.

https://github.com/eclipse-vertx/vert.x/blob/4.x/src/main/java/io/vertx/core/http/impl/Http1xClientConnection.java#L574-L581

If you switch the comment block, you'll see that the output is different if the write stream performs the operations synchronously (order is respected, all good) or if it queue some operations (only end get it in the list)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment