Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benjchristensen/e19d64654ebbe82c5bb1 to your computer and use it in GitHub Desktop.
Save benjchristensen/e19d64654ebbe82c5bb1 to your computer and use it in GitHub Desktop.
ChooseSubjectBasedOnFirstValue
package perf.backend;
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.subjects.AsyncSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
public class ChooseSubjectBasedOnFirstValue {
public static void main(String[] args) {
// Observable<Object> o = Observable.just(1);
Observable<Object> o = Observable.from("A", "B", "C");
o.lift(new Operator<Subject<Object, Object>, Object>() {
@Override
public Subscriber<? super Object> call(Subscriber<? super Subject<Object, Object>> child) {
return new Subscriber<Object>(child) {
private Subject<Object, Object> s;
@Override
public void onCompleted() {
s.onCompleted();
}
@Override
public void onError(Throwable e) {
s.onError(e);
}
@Override
public void onNext(Object t) {
if (s == null) {
if (t instanceof Number) {
s = AsyncSubject.create();
} else {
s = ReplaySubject.create();
}
child.onNext(s);
child.onCompleted();
}
s.onNext(t);
}
};
}
}).flatMap((Subject s) -> {
System.out.println("Subject: " + s);
s.map(i -> "1) => " + i).subscribe(System.out::println);
s.map(i -> "2) => " + i).subscribe(System.out::println);
return s;
}).subscribe();
}
}
@benjchristensen
Copy link
Author

Subject: rx.subjects.ReplaySubject@1f17ae12
1) => A
2) => A
1) => B
2) => B
1) => C
2) => C

or

Subject: rx.subjects.AsyncSubject@37a71e93
1) => 1
2) => 1

@daschl
Copy link

daschl commented May 7, 2014

Thanks much for putting this together. I tried to digest it, but I have one issue in that I still don't know the first observable ("o") at creation time?

Let me give you more context to my use case - maybe this helps (maybe I just don't get it ;))

So the contract to the user is:

public <R extends CouchbaseResponse> Observable<R> send(CouchbaseRequest request)

The user gives me a request and I promise him a observable containing the response. Here is the current implementation which always creates a ReplaySubject for the lack of better knowledge. It then gets put into the ringbuffer and trickles down my code and into netty until it hits the network:

    public <R extends CouchbaseResponse> Observable<R> send(CouchbaseRequest request) {
        final Subject<CouchbaseResponse, CouchbaseResponse> observable = ReplaySubject.create();
        request.observable(observable);

        if (request instanceof InternalRequest) {
            handleInternalRequest(request);
        } else if (request instanceof ClusterRequest) {
            handleClusterRequest(request);
        } else {
            boolean published = requestRingBuffer.tryPublishEvent(REQUEST_TRANSLATOR, request);
            if (!published) {
                observable.onError(BACKPRESSURE_EXCEPTION);
            }
        }

        return (Observable<R>) observable;
    }

The observable here gets attached to the response internally.

What I do when a response arrives async from netty is that from my uppermost decoder I write the CouchbaseResponse object into a response ringbuffer and the handler at the other end ultimately completes my observable like this:

    @Override
    public void onEvent(final ResponseEvent event, long sequence, boolean endOfBatch) throws Exception {
        CouchbaseResponse response = event.getResponse();
        ResponseStatus status = response.status();
        if (status == ResponseStatus.CHUNKED || status == ResponseStatus.SUCCESS) {
            event.getObservable().onNext(response); // <--- this is the same observable from the snippet above which got carried along in netty as well and attached to the event.
            if (status == ResponseStatus.SUCCESS) {
                event.getObservable().onCompleted();
            }
        } else {
           //....
        }
    }

So if those messages indicate a CHUNKED response I just call onNext and the final one always by contract contains SUCCESS or ERROR so I know I need to call onCompleted() or onError().

So this works since I always know its a ReplaySubject, but I found that AsyncSubject has much better performance and GC friendlyness if I just need to respond with 1 onNext call.

@daschl
Copy link

daschl commented May 7, 2014

Maybe the easiest way for me to handle this is just to inspect the CouchbaseRequest type and then do optimizations on that front? It would be hardcoded but then I dont have to go a level of indirection - what do you think?

@benjchristensen
Copy link
Author

For this code:

request.observable(observable);

Does this observable() method only take Subject, or is it any Observer/Subscriber that it will emit to when it receives data?

@daschl
Copy link

daschl commented May 7, 2014

currently it is a subject, but I can change that at will, its only internal structures. this is not exposed to the user in any way.

@benjchristensen
Copy link
Author

I don't have time to finish this tonight but this gives an idea ... it works for this example because everything is synchronous. If async it would fail ... the TODO section explains where it's not yet implemented.

I'll probably go with a state machine + CAS and combine the eventualSubject and array of subscribers into a single state.

Then we just need a proper name for this thing :-)

package test;

import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.subjects.AsyncSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;

public class ChooseSubjectBasedOnFirstValue {

    public static void main(String[] args) {

        ChooseSubjectBasedOnFirstValue c = new ChooseSubjectBasedOnFirstValue();
        Observable<Object> o = c.sendRequest(1);
        //        Observable<Object> o = c.sendRequest(2);
        o.subscribe(System.out::println);

    }

    public Observable<Object> sendRequest(int i) {
        ObservableObserver observableObserver = ObservableObserver.create();
        doActualRequestAsynchronously(i, observableObserver);
        return observableObserver;
    }

    public void doActualRequestAsynchronously(int requestArgs, Observer<Object> callback) {
        if (requestArgs % 2 == 0) {
            callback.onNext(requestArgs);
            callback.onCompleted();
        } else {
            callback.onNext("A");
            callback.onNext("B");
            callback.onNext("C");
            callback.onCompleted();
        }
    }

    public static class ObservableObserver extends Observable<Object> implements Observer<Object> {

        private final AtomicReference<Subject<Object, Object>> eventualSubject;

        public static ObservableObserver create() {
            AtomicReference<Subject<Object, Object>> eventualSubject = new AtomicReference<Subject<Object, Object>>();
            return new ObservableObserver(eventualSubject);
        }

        private ObservableObserver(AtomicReference<Subject<Object, Object>> eventualSubject) {
            super(new OnSubscribe<Object>() {

                @Override
                public void call(Subscriber<? super Object> child) {
                    if (eventualSubject.get() != null) {
                        eventualSubject.get().subscribe(child);
                    } else {
                        // TODO this is the tricky part ...
                        // If this needs to support multicast, it should .publish()
                        // otherwise it needs to also worry about everything in https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java
                        //
                        // Assuming it's one-to-one this then needs to deal with a Subscriber arriving before
                        // the subject has been created ... and do so in a thread-safe manner
                        // 
                        // so either locking or a state machine
                    }
                }

            });
            this.eventualSubject = eventualSubject;
        }

        @Override
        public void onCompleted() {
            if (eventualSubject.get() == null) {
                eventualSubject.set(AsyncSubject.create());
            }
            eventualSubject.get().onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            if (eventualSubject.get() == null) {
                eventualSubject.set(AsyncSubject.create());
            }
            eventualSubject.get().onError(e);
        }

        @Override
        public void onNext(Object t) {
            if (eventualSubject.get() == null) {
                if (t instanceof Number) {
                    System.out.println("Using an AsyncSubject");
                    eventualSubject.set(AsyncSubject.create());
                } else {
                    System.out.println("Using a ReplaySubject");
                    eventualSubject.set(ReplaySubject.create());
                }
            }
            eventualSubject.get().onNext(t);
        }
    }

}

@benjchristensen
Copy link
Author

Try this:

package test;

import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;

public class ChooseSubjectBasedOnFirstValue {

    public static void main(String[] args) {
        ChooseSubjectBasedOnFirstValue c = new ChooseSubjectBasedOnFirstValue();
        Observable<Object> o = c.sendRequest(1);
        o.toBlockingObservable().forEach(System.out::println);

        Observable<Object> o2 = c.sendRequest(2);
        o2.toBlockingObservable().forEach(System.out::println);
    }

    public Observable<Object> sendRequest(int i) {
        ProxySubject<Object> observableObserver = ProxySubject.createSubject((t) -> {
            if (t == null || t instanceof Number) {
                System.out.println("Using an AsyncSubject");
                return AsyncSubject.create();
            } else {
                System.out.println("Using a ReplaySubject");
                return ReplaySubject.create();
            }
        });
        doActualRequestAsynchronously(i, observableObserver);
        return observableObserver;
    }

    Scheduler.Worker worker = Schedulers.computation().createWorker();

    public void doActualRequestAsynchronously(int requestArgs, Observer<Object> callback) {
        worker.schedule(() -> {
            if (requestArgs % 2 == 0) {
                callback.onNext(requestArgs);
                callback.onCompleted();
            } else {
                callback.onNext("A");
                callback.onNext("B");
                callback.onNext("C");
                callback.onCompleted();
            }
        });
    }

    public static class ProxySubject<T> extends Subject<T, T> {
        private final AtomicReference<State<T>> state;
        private final Func1<T, Subject<T, T>> subjectFactory;

        public static <T> ProxySubject<T> createSubject(Func1<T, Subject<T, T>> subjectFactory) {
            return new ProxySubject<T>(new AtomicReference<State<T>>(new State<T>(null, (Subscriber<T>[]) State.EMPTY)), subjectFactory);
        }

        private static class State<T> {
            private final Subject<T, T> subject;
            private final Subscriber<T>[] subscribers;
            private static final Subscriber<?>[] EMPTY = new Subscriber[0];

            private State(Subject<T, T> subject, Subscriber<T>[] subscribers) {
                this.subject = subject;
                this.subscribers = subscribers;
            }
        }

        private ProxySubject(AtomicReference<State<T>> state, Func1<T, Subject<T, T>> subjectFactory) {
            super(new OnSubscribe<T>() {

                @Override
                public void call(Subscriber<? super T> child) {
                    State<T> currentState;
                    State<T> newState;
                    do {
                        currentState = state.get();

                        if (currentState.subject != null) {
                            currentState.subject.subscribe(child);
                            // we don't need to change the state
                            return;
                        } else {
                            Subscriber[] ss = currentState.subscribers;
                            Subscriber[] newss = new Subscriber[ss.length + 1];
                            System.arraycopy(ss, 0, newss, 0, ss.length);
                            newss[newss.length - 1] = child;
                            newState = new State<T>(null, newss);
                        }
                    } while (!state.compareAndSet(currentState, newState));

                }

            });
            this.state = state;
            this.subjectFactory = subjectFactory;
        }

        @Override
        public void onCompleted() {
            if (state.get().subject == null) {
                setSubject(subjectFactory.call(null));
            }
            state.get().subject.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            if (state.get().subject == null) {
                setSubject(subjectFactory.call(null));
            }
            state.get().subject.onError(e);
        }

        @Override
        public void onNext(T t) {
            if (state.get().subject == null) {
                setSubject(subjectFactory.call(t));
            }
            state.get().subject.onNext(t);
        }

        private void setSubject(Subject<T, T> s) {
            State currentState;
            State newState;
            do {
                currentState = state.get();
                if (currentState.subject != null) {
                    throw new IllegalStateException("only 1 subject can be defined");
                }
                newState = new State(s, currentState.subscribers);
            } while (!state.compareAndSet(currentState, newState));
            /* we subscribe any subscribers that were already there */
            for (Subscriber<T> subscriber : newState.subscribers) {
                System.out.println("Subscribing to subject with Subscriber that existed before Subject => " + subscriber);
                newState.subject.subscribe(subscriber);
            }
        }
    }

}

@daschl
Copy link

daschl commented May 7, 2014

Wow thanks much for your efforts!

Will try it tomorrow morning and report success or failure :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment