Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save benjchristensen/e19d64654ebbe82c5bb1 to your computer and use it in GitHub Desktop.
Save benjchristensen/e19d64654ebbe82c5bb1 to your computer and use it in GitHub Desktop.
ChooseSubjectBasedOnFirstValue
package perf.backend;
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.subjects.AsyncSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
public class ChooseSubjectBasedOnFirstValue {
public static void main(String[] args) {
// Observable<Object> o = Observable.just(1);
Observable<Object> o = Observable.from("A", "B", "C");
o.lift(new Operator<Subject<Object, Object>, Object>() {
@Override
public Subscriber<? super Object> call(Subscriber<? super Subject<Object, Object>> child) {
return new Subscriber<Object>(child) {
private Subject<Object, Object> s;
@Override
public void onCompleted() {
s.onCompleted();
}
@Override
public void onError(Throwable e) {
s.onError(e);
}
@Override
public void onNext(Object t) {
if (s == null) {
if (t instanceof Number) {
s = AsyncSubject.create();
} else {
s = ReplaySubject.create();
}
child.onNext(s);
child.onCompleted();
}
s.onNext(t);
}
};
}
}).flatMap((Subject s) -> {
System.out.println("Subject: " + s);
s.map(i -> "1) => " + i).subscribe(System.out::println);
s.map(i -> "2) => " + i).subscribe(System.out::println);
return s;
}).subscribe();
}
}
@benjchristensen
Copy link
Author

Try this:

package test;

import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;

public class ChooseSubjectBasedOnFirstValue {

    public static void main(String[] args) {
        ChooseSubjectBasedOnFirstValue c = new ChooseSubjectBasedOnFirstValue();
        Observable<Object> o = c.sendRequest(1);
        o.toBlockingObservable().forEach(System.out::println);

        Observable<Object> o2 = c.sendRequest(2);
        o2.toBlockingObservable().forEach(System.out::println);
    }

    public Observable<Object> sendRequest(int i) {
        ProxySubject<Object> observableObserver = ProxySubject.createSubject((t) -> {
            if (t == null || t instanceof Number) {
                System.out.println("Using an AsyncSubject");
                return AsyncSubject.create();
            } else {
                System.out.println("Using a ReplaySubject");
                return ReplaySubject.create();
            }
        });
        doActualRequestAsynchronously(i, observableObserver);
        return observableObserver;
    }

    Scheduler.Worker worker = Schedulers.computation().createWorker();

    public void doActualRequestAsynchronously(int requestArgs, Observer<Object> callback) {
        worker.schedule(() -> {
            if (requestArgs % 2 == 0) {
                callback.onNext(requestArgs);
                callback.onCompleted();
            } else {
                callback.onNext("A");
                callback.onNext("B");
                callback.onNext("C");
                callback.onCompleted();
            }
        });
    }

    public static class ProxySubject<T> extends Subject<T, T> {
        private final AtomicReference<State<T>> state;
        private final Func1<T, Subject<T, T>> subjectFactory;

        public static <T> ProxySubject<T> createSubject(Func1<T, Subject<T, T>> subjectFactory) {
            return new ProxySubject<T>(new AtomicReference<State<T>>(new State<T>(null, (Subscriber<T>[]) State.EMPTY)), subjectFactory);
        }

        private static class State<T> {
            private final Subject<T, T> subject;
            private final Subscriber<T>[] subscribers;
            private static final Subscriber<?>[] EMPTY = new Subscriber[0];

            private State(Subject<T, T> subject, Subscriber<T>[] subscribers) {
                this.subject = subject;
                this.subscribers = subscribers;
            }
        }

        private ProxySubject(AtomicReference<State<T>> state, Func1<T, Subject<T, T>> subjectFactory) {
            super(new OnSubscribe<T>() {

                @Override
                public void call(Subscriber<? super T> child) {
                    State<T> currentState;
                    State<T> newState;
                    do {
                        currentState = state.get();

                        if (currentState.subject != null) {
                            currentState.subject.subscribe(child);
                            // we don't need to change the state
                            return;
                        } else {
                            Subscriber[] ss = currentState.subscribers;
                            Subscriber[] newss = new Subscriber[ss.length + 1];
                            System.arraycopy(ss, 0, newss, 0, ss.length);
                            newss[newss.length - 1] = child;
                            newState = new State<T>(null, newss);
                        }
                    } while (!state.compareAndSet(currentState, newState));

                }

            });
            this.state = state;
            this.subjectFactory = subjectFactory;
        }

        @Override
        public void onCompleted() {
            if (state.get().subject == null) {
                setSubject(subjectFactory.call(null));
            }
            state.get().subject.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            if (state.get().subject == null) {
                setSubject(subjectFactory.call(null));
            }
            state.get().subject.onError(e);
        }

        @Override
        public void onNext(T t) {
            if (state.get().subject == null) {
                setSubject(subjectFactory.call(t));
            }
            state.get().subject.onNext(t);
        }

        private void setSubject(Subject<T, T> s) {
            State currentState;
            State newState;
            do {
                currentState = state.get();
                if (currentState.subject != null) {
                    throw new IllegalStateException("only 1 subject can be defined");
                }
                newState = new State(s, currentState.subscribers);
            } while (!state.compareAndSet(currentState, newState));
            /* we subscribe any subscribers that were already there */
            for (Subscriber<T> subscriber : newState.subscribers) {
                System.out.println("Subscribing to subject with Subscriber that existed before Subject => " + subscriber);
                newState.subject.subscribe(subscriber);
            }
        }
    }

}

@daschl
Copy link

daschl commented May 7, 2014

Wow thanks much for your efforts!

Will try it tomorrow morning and report success or failure :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment