Created
June 3, 2013 15:37
-
-
Save Treora/5699048 to your computer and use it in GitHub Desktop.
These examples show that when subscribing a new observer to a PublishSubject while receiving a value from it, the PublishSubject unpredictably does or does not pass the current value to the new observer.
Its behaviour presumably depends on the hashes of the subscriptions in the ConcurrentHashMap it uses. Changing the amount of (unrelated) subscr…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[0] (normal subscription) | |
[0] spawn new listener: #0 | |
[0] seen by #0 (received its own ID!) | |
[1] (normal subscription) | |
[1] spawn new listener: #1 | |
[1] seen by #0 | |
[2] seen by #1 | |
[2] (normal subscription) | |
[2] spawn new listener: #2 | |
[2] seen by #0 | |
[3] seen by #1 | |
[3] (normal subscription) | |
[3] seen by #2 | |
[3] spawn new listener: #3 | |
[3] seen by #0 | |
[4] seen by #1 | |
[4] seen by #3 | |
[4] (normal subscription) | |
[4] seen by #2 | |
[4] spawn new listener: #4 | |
[4] seen by #0 | |
[4] seen by #4 (received its own ID!) | |
[5] seen by #1 | |
[5] seen by #3 | |
[5] (normal subscription) | |
[5] seen by #2 | |
[5] spawn new listener: #5 | |
[5] seen by #5 (received its own ID!) | |
[5] seen by #0 | |
[5] seen by #4 | |
[6] seen by #1 | |
[6] seen by #3 | |
[6] (normal subscription) | |
[6] seen by #2 | |
[6] spawn new listener: #6 | |
[6] seen by #5 | |
[6] seen by #0 | |
[6] seen by #4 | |
[6] seen by #6 (received its own ID!) | |
[7] seen by #1 | |
[7] seen by #3 | |
[7] (normal subscription) | |
[7] seen by #2 | |
[7] spawn new listener: #7 | |
[7] seen by #5 | |
[7] seen by #0 | |
[7] seen by #4 | |
[7] seen by #7 (received its own ID!) | |
[7] seen by #6 | |
[8] seen by #1 | |
[8] seen by #3 | |
[8] (normal subscription) | |
[8] seen by #2 | |
[8] spawn new listener: #8 | |
[8] seen by #5 | |
[8] seen by #0 | |
[8] seen by #4 | |
[8] seen by #7 | |
[8] seen by #8 (received its own ID!) | |
[8] seen by #6 | |
[9] seen by #1 | |
[9] seen by #3 | |
[9] (normal subscription) | |
[9] seen by #2 | |
[9] spawn new listener: #9 | |
[9] seen by #5 | |
[9] seen by #4 | |
[9] seen by #0 | |
[9] seen by #7 | |
[9] seen by #8 | |
[9] seen by #6 | |
[9] seen by #9 (received its own ID!) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package testpublishsubject; | |
import rx.Observable; | |
import rx.observables.ConnectableObservable; | |
public class TestPublishSubjectMapMany { | |
private static int subscriptionCounter = 0; | |
public static void main(String[] args) { | |
Observable<Integer> range = Observable.range(0, 10); | |
ConnectableObservable<Integer> pub = range.publish(); | |
pub.subscribe(x -> print("[" + x + "] (normal subscription)")); | |
// Do some extra unrelated subscriptions to change the hash of subscriptions spawned hereafter | |
ConnectableObservable unrelated = Observable.range(3, 5).publish(); | |
unrelated.subscribe(x -> nop()); | |
pub.mapMany(x -> { | |
int subscriptionID = subscriptionCounter; | |
print("[" + x + "] spawn new listener: #" + subscriptionCounter); | |
subscriptionCounter++; | |
Observable<ItemAndIDPair> newObservable = pub.map(xx -> { | |
return new ItemAndIDPair(xx, subscriptionID); | |
}); | |
return newObservable; | |
}).subscribe(pair -> { | |
if (pair.id == pair.item) { | |
print("[" + pair.item + "] seen by #" + pair.id + " (received its own ID!)"); | |
} else { | |
print("[" + pair.item + "] seen by #" + pair.id); | |
} | |
}); | |
pub.connect(); | |
} | |
public static void nop() { | |
} | |
public static void print(Object o) { | |
System.out.println(o); | |
} | |
private static class ItemAndIDPair { | |
public int item; | |
public int id; | |
public ItemAndIDPair(int item, int id) { | |
this.item = item; | |
this.id = id; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package testpublishsubject; | |
import rx.Observable; | |
import rx.observables.ConnectableObservable; | |
public class TestPublishSubjectNested { | |
private static int subscriptionCounter = 0; | |
public static void main(String[] args) { | |
Observable range = Observable.range(0, 10); | |
ConnectableObservable pub = range.publish(); | |
pub.subscribe(x -> print("[" + x + "] (normal subscription)")); | |
// Do some extra unrelated subscriptions to change the hash of subscriptions spawned hereafter | |
ConnectableObservable unrelated = Observable.range(3, 5).publish(); | |
unrelated.subscribe(x -> nop()); | |
unrelated.subscribe(x -> nop()); | |
// The Observer-spawning Observer | |
pub.subscribe(x -> { | |
int subscriptionID = subscriptionCounter; | |
print("[" + x + "] spawn new listener: #" + subscriptionCounter); | |
pub.subscribe(xx -> { | |
if (xx == subscriptionID) { | |
print("[" + xx + "] seen by #" + subscriptionID + " (received its own ID!)"); | |
} else { | |
print("[" + xx + "] seen by #" + subscriptionID); | |
} | |
}); | |
subscriptionCounter++; | |
}); | |
pub.connect(); | |
} | |
public static void nop() { | |
} | |
public static void print(Object o) { | |
System.out.println(o); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
See fix here: ReactiveX/RxJava#288