Last active
December 14, 2018 22:55
-
-
Save ahmed3elshaer/ac3fee8547dae2fa8fc31a79355de4e5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ReplaySubject | |
"Default ReplaySubject" | |
unsubscribe(); | |
createSubscription(); | |
ReplaySubject<Stock> stockReplaySubject = ReplaySubject.create(); | |
stockReplaySubject.onNext(new Stock(GOOG, 715.09)); | |
stockReplaySubject.onNext(new Stock(GOOG, 716.00)); | |
stockReplaySubject.onNext(new Stock(GOOG, 714.00)); | |
Subscription defaultSub = stockReplaySubject.subscribe(getDefaultSubscriber()); | |
compositeSubscription.add(defaultSub); // All three values will be delivered. | |
stockReplaySubject.onNext(new Stock(GOOG, 720)); | |
stockReplaySubject.onCompleted(); // Terminate the stream with a completed event. | |
// Subscribe again, this time the subscriber will get all events and the terminal event | |
// right away. All items are "replayed" even though the stream has completed. | |
Subscription tardySubscription = stockReplaySubject.subscribe(getTardySubscriber()); | |
compositeSubscription.add(tardySubscription); | |
==>OUTPUT<== | |
2018-12-14 19:03:27.408 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 715.09 | |
2018-12-14 19:03:27.410 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 716.0 | |
2018-12-14 19:03:27.412 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 714.0 | |
2018-12-14 19:03:27.414 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 720.0 | |
2018-12-14 19:03:27.415 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Default subscriber completed called. | |
2018-12-14 19:03:27.417 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 715.09 | |
2018-12-14 19:03:27.419 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 716.0 | |
2018-12-14 19:03:27.420 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 714.0 | |
2018-12-14 19:03:27.422 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 720.0 | |
2018-12-14 19:03:27.422 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Tardy subscriber completed called. | |
________________________________________________________________________________ | |
"Sized Replay Subject" | |
unsubscribe(); | |
createSubscription(); | |
// A replay subject that will only replay the last two items. | |
ReplaySubject<Stock> stockReplaySubject = ReplaySubject.createWithSize(2); | |
stockReplaySubject.onNext(new Stock(GOOG, 715.09)); | |
stockReplaySubject.onNext(new Stock(GOOG, 716.00)); | |
stockReplaySubject.onNext(new Stock(GOOG, 714.00)); | |
// Only the last two items will be replayed to this subscriber (716 and 714) | |
Subscription defaultSub = stockReplaySubject.subscribe(getDefaultSubscriber()); | |
compositeSubscription.add(defaultSub); // All three values will be delivered. | |
// This will also be emitted to the defaultSub above. | |
stockReplaySubject.onNext(new Stock(GOOG, 720)); | |
stockReplaySubject.onCompleted(); // Terminate the stream with a completed event. | |
// Subscribe again, this time the subscriber will get the last two events and the terminal | |
// event right away. The last two items are "replayed" even though the stream has completed. | |
Subscription tardySubscription = stockReplaySubject.subscribe(getTardySubscriber()); | |
compositeSubscription.add(tardySubscription); | |
==>OUTPUT<== | |
2018-12-14 19:12:19.959 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 716.0 | |
2018-12-14 19:12:19.960 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 714.0 | |
2018-12-14 19:12:19.962 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 720.0 | |
2018-12-14 19:12:19.963 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Default subscriber completed called. | |
2018-12-14 19:12:19.964 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 714.0 | |
2018-12-14 19:12:19.966 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 720.0 | |
2018-12-14 19:12:19.966 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Tardy subscriber completed called. | |
________________________________________________________________________________ | |
"Timed Replay Subject" | |
unsubscribe(); | |
createSubscription(); | |
// A replay subject that will only replay the last two items. | |
ReplaySubject<Stock> stockReplaySubject = ReplaySubject.createWithTime(250, TimeUnit.MILLISECONDS, Schedulers.immediate()); | |
stockReplaySubject.onNext(new Stock(GOOG, 715.09)); | |
Thread.sleep(100); | |
stockReplaySubject.onNext(new Stock(GOOG, 716.00)); | |
Thread.sleep(100); | |
stockReplaySubject.onNext(new Stock(GOOG, 714.00)); | |
Thread.sleep(100); | |
// Only the last two items will be replayed to this subscriber (716 and 714) | |
// because the first one has already expired. | |
Subscription defaultSub = stockReplaySubject.subscribe(getDefaultSubscriber()); | |
compositeSubscription.add(defaultSub); // All three values will be delivered. | |
// This will also be emitted to the defaultSub above. | |
stockReplaySubject.onNext(new Stock(GOOG, 720)); | |
Thread.sleep(100); | |
stockReplaySubject.onCompleted(); // Terminate the stream with a completed event. | |
// Lets sleep for another 100 millis to simulate some time passing. | |
Thread.sleep(100); | |
// Subscribe again with a new subscriber. This time the only item that is valid is | |
// the last item: '720' as all the others have expired. | |
Subscription tardySubscription = stockReplaySubject.subscribe(getTardySubscriber()); | |
compositeSubscription.add(tardySubscription); | |
==>OUTPUT<== | |
2018-12-14 19:26:17.449 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 716.0 | |
2018-12-14 19:26:17.453 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 714.0 | |
2018-12-14 19:26:17.458 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 720.0 | |
2018-12-14 19:26:17.560 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Default subscriber completed called. | |
2018-12-14 19:26:17.665 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 720.0 | |
2018-12-14 19:26:17.667 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Tardy subscriber completed called. | |
_________________________________________ | |
Time and Size bound ReplaySubject | |
unsubscribe(); | |
createSubscription(); | |
// A replay subject that will only replay the last two items. | |
ReplaySubject<Stock> stockReplaySubject = ReplaySubject.createWithTimeAndSize(250, TimeUnit.MILLISECONDS,2, Schedulers.immediate()); | |
stockReplaySubject.onNext(new Stock(GOOG, 715.09)); | |
Thread.sleep(100); | |
stockReplaySubject.onNext(new Stock(GOOG, 716.00)); | |
Thread.sleep(100); | |
stockReplaySubject.onNext(new Stock(GOOG, 714.00)); | |
Thread.sleep(100); | |
// Only the last two items will be replayed to this subscriber (716 and 714) and limited to size 2 | |
// because the first one has already expired. | |
Subscription defaultSub = stockReplaySubject.subscribe(getDefaultSubscriber()); | |
compositeSubscription.add(defaultSub); // All three values will be delivered. | |
// This will also be emitted to the defaultSub above. | |
stockReplaySubject.onNext(new Stock(GOOG, 720)); | |
Thread.sleep(100); | |
stockReplaySubject.onCompleted(); // Terminate the stream with a completed event. | |
// Lets sleep for another 100 millis to simulate some time passing. | |
Thread.sleep(100); | |
// Subscribe again with a new subscriber. This time the only item that is valid is | |
// the last item: '720' as all the others have expired and it more than the size limit (2). | |
Subscription tardySubscription = stockReplaySubject.subscribe(getTardySubscriber()); | |
compositeSubscription.add(tardySubscription); | |
==>OUTPUT<== | |
2018-12-14 19:31:56.589 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 716.0 | |
2018-12-14 19:31:56.593 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 714.0 | |
2018-12-14 19:31:56.598 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 720.0 | |
2018-12-14 19:31:56.700 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: Default subscriber completed called. | |
2018-12-14 19:31:56.806 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 720.0 | |
2018-12-14 19:31:56.807 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: Tardy subscriber completed called. | |
*Async Subject* | |
unsubscribe(); | |
createSubscription(); | |
AsyncSubject<Stock> stockAsyncSubject = AsyncSubject.create(); | |
stockAsyncSubject.onNext(new Stock(GOOG, 722)); | |
// Will get the last value (GOOG, 723) and all future items and terminal events | |
Subscription subscription = stockAsyncSubject.subscribe(getDefaultSubscriber()); | |
compositeSubscription.add(subscription); | |
stockAsyncSubject.onNext(new Stock(GOOG, 723)); | |
stockAsyncSubject.onNext(new Stock(GOOG, 100)); | |
stockAsyncSubject.onNext(new Stock(GOOG, 699)); | |
stockAsyncSubject.onCompleted(); | |
Subscription tardySubscription = stockAsyncSubject.subscribe(getTardySubscriber()); | |
compositeSubscription.add(tardySubscription); | |
==>OUTPUT<== | |
2018-12-14 19:45:40.991 6482-6482/io.caster.rxexamples D/AsyncSubjectFragment: onNext on default subscriber: GOOG - 699.0 | |
2018-12-14 19:45:40.992 6482-6482/io.caster.rxexamples D/AsyncSubjectFragment: Default subscriber completed called. | |
2018-12-14 19:45:40.995 6482-6482/io.caster.rxexamples D/AsyncSubjectFragment: onNext on tardy subscriber: GOOG - 699.0 | |
2018-12-14 19:45:40.996 6482-6482/io.caster.rxexamples D/AsyncSubjectFragment: Tardy subscriber completed called. | |
________________________________________________________________________________ | |
"with error" | |
unsubscribe(); | |
createSubscription(); | |
AsyncSubject<Stock> stockAsyncSubject = AsyncSubject.create(); | |
stockAsyncSubject.onNext(new Stock(GOOG, 722)); | |
// Will get the last value (GOOG, 723) and all future items and terminal events | |
Subscription subscription = stockAsyncSubject.subscribe(getDefaultSubscriber()); | |
compositeSubscription.add(subscription); | |
stockAsyncSubject.onNext(new Stock(GOOG, 723)); | |
stockAsyncSubject.onNext(new Stock(GOOG, 100)); | |
stockAsyncSubject.onNext(new Stock(GOOG, 699)); | |
stockAsyncSubject.onError(new Exception("Boom!")); // current and future subscribers will only receive this, with NO items emitted. | |
Subscription tardySubscription = stockAsyncSubject.subscribe(getTardySubscriber()); | |
compositeSubscription.add(tardySubscription); | |
==>OUTPUT<== | |
2018-12-14 19:47:02.844 6482-6482/io.caster.rxexamples E/AsyncSubjectFragment: Error called on default subscriber. | |
java.lang.Exception: Boom! | |
*Publish Subject* | |
unsubscribe(); | |
createSubscription(); | |
PublishSubject<Stock> publishSubject = PublishSubject.create(); | |
publishSubject.onNext(new Stock(GOOG, 722)); | |
Subscription subscription = publishSubject.subscribe(getDefaultSubscriber()); | |
compositeSubscription.add(subscription); | |
publishSubject.onNext(new Stock(GOOG, 723)); | |
publishSubject.onNext(new Stock(GOOG, 100)); | |
publishSubject.onNext(new Stock(GOOG, 699)); | |
publishSubject.onCompleted(); | |
Subscription tardySubscription = publishSubject.subscribe(getTardySubscriber()); | |
compositeSubscription.add(tardySubscription); | |
==>OUTPUT<== | |
2018-12-15 00:36:41.482 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: onNext on default subscriber: GOOG - 723.0 | |
2018-12-15 00:36:41.484 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: onNext on default subscriber: GOOG - 100.0 | |
2018-12-15 00:36:41.485 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: onNext on default subscriber: GOOG - 699.0 | |
2018-12-15 00:36:41.485 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: Default subscriber completed called. | |
2018-12-15 00:36:41.486 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: Tardy subscriber completed called. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment