Created December 22, 2023 11:03
package org.forgerock.openig.launcher;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.rxjava3.impl.ReadStreamSubscriber;
public class StreamIssue {
public static void main(String[] args) throws InterruptedException {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new AbstractVerticle() {
public void start() throws Exception {
for (int i = 0; i < 10; i++) {
CountDownLatch latch = new CountDownLatch(1);
Flowable<String> flowable = Flowable.<String>generate(emitter -> {
}).subscribeOn(; // subscription happens on a NON event-loop thread
ReadStream<String> rs = new ReadStreamSubscriber<>(Function.identity(), flowable);
WriteInQueueStream ws = new WriteInQueueStream((ContextInternal) vertx.getOrCreateContext());
ReadStream<String> rs2 = new DelegateReadStream(rs) {
public ReadStream<String> resume() {
// the RS is "open", accepting items from the flowable
// an initial request() is done, that triggers a subscription
// on a different thread:
// items are placed in the pending queue and consumed immediately on the
// non-Vert.x thread
try {
// We block here, waiting for the flowable to complete
// So that, in the pipe, when
// `result.future().onComplete(...)`
// is executed, the RS has completed, and the completion handler
// is executed right away
} catch (InterruptedException e) {
throw new RuntimeException(e);
return this;
rs2.pipeTo(ws, result -> {
if (result.succeeded()) {
// => [RxCachedThreadScheduler-1 hello, vert.x-eventloop-thread-0 end]
} else {
System.out.println("KO " + result.cause());
private static class WriteInQueueStream implements WriteStream<String> {
private final ContextInternal context;
Deque<String> list = new ConcurrentLinkedDeque<>();
public WriteInQueueStream(final ContextInternal context) {
this.context = context;
public WriteStream<String> exceptionHandler(@Nullable final Handler<Throwable> handler) {
return this;
public Future<Void> write(final String data) {
Promise<Void> p = Promise.promise();
write(data, p);
return p.future();
public void write(final String data, final Handler<AsyncResult<Void>> handler) {
String receiver = " | Received on " + Thread.currentThread().getName();
context.execute(() -> {
String executed = " | Executed by " + Thread.currentThread().getName();
list.add(data + receiver + executed);
String executed = " | Executed by " + Thread.currentThread().getName();
list.add(data + receiver + executed);
public Future<Void> end() {
Promise<Void> p = Promise.promise();
return p.future();
public void end(final Handler<AsyncResult<Void>> handler) {
write("end", handler);
public WriteStream<String> setWriteQueueMaxSize(final int maxSize) {
return this;
public boolean writeQueueFull() {
return false;
public WriteStream<String> drainHandler(@Nullable final Handler<Void> handler) {
return this;
private static abstract class DelegateReadStream implements ReadStream<String> {
ReadStream<String> delegate;
public DelegateReadStream(final ReadStream<String> delegate) {
this.delegate = delegate;
public ReadStream<String> exceptionHandler(@Nullable final Handler<Throwable> handler) {
return this;
public ReadStream<String> handler(@Nullable final Handler<String> handler) {
return this;
public ReadStream<String> pause() {
return this;
public ReadStream<String> resume() {
return this;
public ReadStream<String> fetch(final long amount) {
return this;
public ReadStream<String> endHandler(@Nullable final Handler<Void> endHandler) {
return this;
I use ContextInternal.execute() in WriteInQueueStream.write() to mimic the behaviour of the HttpClientRequest (that comes from Http1xClientConnection.Streamimpl) that enforces write operations on the Netty channel to be on the event loop thread.

If you switch the comment block, you'll see that the output is different if the write stream performs the operations synchronously (order is respected, all good) or if it queue some operations (only end get it in the list)

