Skip to content

Instantly share code, notes, and snippets.

@Treora
Created June 3, 2013 15:37
Show Gist options
  • Save Treora/5699048 to your computer and use it in GitHub Desktop.
Save Treora/5699048 to your computer and use it in GitHub Desktop.
These examples show that when subscribing a new observer to a PublishSubject while receiving a value from it, the PublishSubject unpredictably does or does not pass the current value to the new observer. Its behaviour presumably depends on the hashes of the subscriptions in the ConcurrentHashMap it uses. Changing the amount of (unrelated) subscr…
[0] (normal subscription)
[0] spawn new listener: #0
[0] seen by #0 (received its own ID!)
[1] (normal subscription)
[1] spawn new listener: #1
[1] seen by #0
[2] seen by #1
[2] (normal subscription)
[2] spawn new listener: #2
[2] seen by #0
[3] seen by #1
[3] (normal subscription)
[3] seen by #2
[3] spawn new listener: #3
[3] seen by #0
[4] seen by #1
[4] seen by #3
[4] (normal subscription)
[4] seen by #2
[4] spawn new listener: #4
[4] seen by #0
[4] seen by #4 (received its own ID!)
[5] seen by #1
[5] seen by #3
[5] (normal subscription)
[5] seen by #2
[5] spawn new listener: #5
[5] seen by #5 (received its own ID!)
[5] seen by #0
[5] seen by #4
[6] seen by #1
[6] seen by #3
[6] (normal subscription)
[6] seen by #2
[6] spawn new listener: #6
[6] seen by #5
[6] seen by #0
[6] seen by #4
[6] seen by #6 (received its own ID!)
[7] seen by #1
[7] seen by #3
[7] (normal subscription)
[7] seen by #2
[7] spawn new listener: #7
[7] seen by #5
[7] seen by #0
[7] seen by #4
[7] seen by #7 (received its own ID!)
[7] seen by #6
[8] seen by #1
[8] seen by #3
[8] (normal subscription)
[8] seen by #2
[8] spawn new listener: #8
[8] seen by #5
[8] seen by #0
[8] seen by #4
[8] seen by #7
[8] seen by #8 (received its own ID!)
[8] seen by #6
[9] seen by #1
[9] seen by #3
[9] (normal subscription)
[9] seen by #2
[9] spawn new listener: #9
[9] seen by #5
[9] seen by #4
[9] seen by #0
[9] seen by #7
[9] seen by #8
[9] seen by #6
[9] seen by #9 (received its own ID!)
package testpublishsubject;
import rx.Observable;
import rx.observables.ConnectableObservable;
public class TestPublishSubjectMapMany {
private static int subscriptionCounter = 0;
public static void main(String[] args) {
Observable<Integer> range = Observable.range(0, 10);
ConnectableObservable<Integer> pub = range.publish();
pub.subscribe(x -> print("[" + x + "] (normal subscription)"));
// Do some extra unrelated subscriptions to change the hash of subscriptions spawned hereafter
ConnectableObservable unrelated = Observable.range(3, 5).publish();
unrelated.subscribe(x -> nop());
pub.mapMany(x -> {
int subscriptionID = subscriptionCounter;
print("[" + x + "] spawn new listener: #" + subscriptionCounter);
subscriptionCounter++;
Observable<ItemAndIDPair> newObservable = pub.map(xx -> {
return new ItemAndIDPair(xx, subscriptionID);
});
return newObservable;
}).subscribe(pair -> {
if (pair.id == pair.item) {
print("[" + pair.item + "] seen by #" + pair.id + " (received its own ID!)");
} else {
print("[" + pair.item + "] seen by #" + pair.id);
}
});
pub.connect();
}
public static void nop() {
}
public static void print(Object o) {
System.out.println(o);
}
private static class ItemAndIDPair {
public int item;
public int id;
public ItemAndIDPair(int item, int id) {
this.item = item;
this.id = id;
}
}
}
package testpublishsubject;
import rx.Observable;
import rx.observables.ConnectableObservable;
public class TestPublishSubjectNested {
private static int subscriptionCounter = 0;
public static void main(String[] args) {
Observable range = Observable.range(0, 10);
ConnectableObservable pub = range.publish();
pub.subscribe(x -> print("[" + x + "] (normal subscription)"));
// Do some extra unrelated subscriptions to change the hash of subscriptions spawned hereafter
ConnectableObservable unrelated = Observable.range(3, 5).publish();
unrelated.subscribe(x -> nop());
unrelated.subscribe(x -> nop());
// The Observer-spawning Observer
pub.subscribe(x -> {
int subscriptionID = subscriptionCounter;
print("[" + x + "] spawn new listener: #" + subscriptionCounter);
pub.subscribe(xx -> {
if (xx == subscriptionID) {
print("[" + xx + "] seen by #" + subscriptionID + " (received its own ID!)");
} else {
print("[" + xx + "] seen by #" + subscriptionID);
}
});
subscriptionCounter++;
});
pub.connect();
}
public static void nop() {
}
public static void print(Object o) {
System.out.println(o);
}
}
@benjchristensen
Copy link

See fix here: ReactiveX/RxJava#288

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment