Skip to content

Instantly share code, notes, and snippets.

@ahmed3elshaer
Last active December 14, 2018 22:55
Show Gist options
  • Save ahmed3elshaer/ac3fee8547dae2fa8fc31a79355de4e5 to your computer and use it in GitHub Desktop.
Save ahmed3elshaer/ac3fee8547dae2fa8fc31a79355de4e5 to your computer and use it in GitHub Desktop.
ReplaySubject
"Default ReplaySubject"
unsubscribe();
createSubscription();
ReplaySubject<Stock> stockReplaySubject = ReplaySubject.create();
stockReplaySubject.onNext(new Stock(GOOG, 715.09));
stockReplaySubject.onNext(new Stock(GOOG, 716.00));
stockReplaySubject.onNext(new Stock(GOOG, 714.00));
Subscription defaultSub = stockReplaySubject.subscribe(getDefaultSubscriber());
compositeSubscription.add(defaultSub); // All three values will be delivered.
stockReplaySubject.onNext(new Stock(GOOG, 720));
stockReplaySubject.onCompleted(); // Terminate the stream with a completed event.
// Subscribe again, this time the subscriber will get all events and the terminal event
// right away. All items are "replayed" even though the stream has completed.
Subscription tardySubscription = stockReplaySubject.subscribe(getTardySubscriber());
compositeSubscription.add(tardySubscription);
==>OUTPUT<==
2018-12-14 19:03:27.408 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 715.09
2018-12-14 19:03:27.410 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 716.0
2018-12-14 19:03:27.412 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 714.0
2018-12-14 19:03:27.414 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 720.0
2018-12-14 19:03:27.415 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Default subscriber completed called.
2018-12-14 19:03:27.417 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 715.09
2018-12-14 19:03:27.419 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 716.0
2018-12-14 19:03:27.420 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 714.0
2018-12-14 19:03:27.422 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 720.0
2018-12-14 19:03:27.422 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Tardy subscriber completed called.
________________________________________________________________________________
"Sized Replay Subject"
unsubscribe();
createSubscription();
// A replay subject that will only replay the last two items.
ReplaySubject<Stock> stockReplaySubject = ReplaySubject.createWithSize(2);
stockReplaySubject.onNext(new Stock(GOOG, 715.09));
stockReplaySubject.onNext(new Stock(GOOG, 716.00));
stockReplaySubject.onNext(new Stock(GOOG, 714.00));
// Only the last two items will be replayed to this subscriber (716 and 714)
Subscription defaultSub = stockReplaySubject.subscribe(getDefaultSubscriber());
compositeSubscription.add(defaultSub); // All three values will be delivered.
// This will also be emitted to the defaultSub above.
stockReplaySubject.onNext(new Stock(GOOG, 720));
stockReplaySubject.onCompleted(); // Terminate the stream with a completed event.
// Subscribe again, this time the subscriber will get the last two events and the terminal
// event right away. The last two items are "replayed" even though the stream has completed.
Subscription tardySubscription = stockReplaySubject.subscribe(getTardySubscriber());
compositeSubscription.add(tardySubscription);
==>OUTPUT<==
2018-12-14 19:12:19.959 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 716.0
2018-12-14 19:12:19.960 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 714.0
2018-12-14 19:12:19.962 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 720.0
2018-12-14 19:12:19.963 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Default subscriber completed called.
2018-12-14 19:12:19.964 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 714.0
2018-12-14 19:12:19.966 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 720.0
2018-12-14 19:12:19.966 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Tardy subscriber completed called.
________________________________________________________________________________
"Timed Replay Subject"
unsubscribe();
createSubscription();
// A replay subject that will only replay the last two items.
ReplaySubject<Stock> stockReplaySubject = ReplaySubject.createWithTime(250, TimeUnit.MILLISECONDS, Schedulers.immediate());
stockReplaySubject.onNext(new Stock(GOOG, 715.09));
Thread.sleep(100);
stockReplaySubject.onNext(new Stock(GOOG, 716.00));
Thread.sleep(100);
stockReplaySubject.onNext(new Stock(GOOG, 714.00));
Thread.sleep(100);
// Only the last two items will be replayed to this subscriber (716 and 714)
// because the first one has already expired.
Subscription defaultSub = stockReplaySubject.subscribe(getDefaultSubscriber());
compositeSubscription.add(defaultSub); // All three values will be delivered.
// This will also be emitted to the defaultSub above.
stockReplaySubject.onNext(new Stock(GOOG, 720));
Thread.sleep(100);
stockReplaySubject.onCompleted(); // Terminate the stream with a completed event.
// Lets sleep for another 100 millis to simulate some time passing.
Thread.sleep(100);
// Subscribe again with a new subscriber. This time the only item that is valid is
// the last item: '720' as all the others have expired.
Subscription tardySubscription = stockReplaySubject.subscribe(getTardySubscriber());
compositeSubscription.add(tardySubscription);
==>OUTPUT<==
2018-12-14 19:26:17.449 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 716.0
2018-12-14 19:26:17.453 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 714.0
2018-12-14 19:26:17.458 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 720.0
2018-12-14 19:26:17.560 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Default subscriber completed called.
2018-12-14 19:26:17.665 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 720.0
2018-12-14 19:26:17.667 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Tardy subscriber completed called.
_________________________________________
Time and Size bound ReplaySubject
unsubscribe();
createSubscription();
// A replay subject that will only replay the last two items.
ReplaySubject<Stock> stockReplaySubject = ReplaySubject.createWithTimeAndSize(250, TimeUnit.MILLISECONDS,2, Schedulers.immediate());
stockReplaySubject.onNext(new Stock(GOOG, 715.09));
Thread.sleep(100);
stockReplaySubject.onNext(new Stock(GOOG, 716.00));
Thread.sleep(100);
stockReplaySubject.onNext(new Stock(GOOG, 714.00));
Thread.sleep(100);
// Only the last two items will be replayed to this subscriber (716 and 714) and limited to size 2
// because the first one has already expired.
Subscription defaultSub = stockReplaySubject.subscribe(getDefaultSubscriber());
compositeSubscription.add(defaultSub); // All three values will be delivered.
// This will also be emitted to the defaultSub above.
stockReplaySubject.onNext(new Stock(GOOG, 720));
Thread.sleep(100);
stockReplaySubject.onCompleted(); // Terminate the stream with a completed event.
// Lets sleep for another 100 millis to simulate some time passing.
Thread.sleep(100);
// Subscribe again with a new subscriber. This time the only item that is valid is
// the last item: '720' as all the others have expired and it more than the size limit (2).
Subscription tardySubscription = stockReplaySubject.subscribe(getTardySubscriber());
compositeSubscription.add(tardySubscription);
==>OUTPUT<==
2018-12-14 19:31:56.589 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 716.0
2018-12-14 19:31:56.593 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 714.0
2018-12-14 19:31:56.598 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 720.0
2018-12-14 19:31:56.700 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: Default subscriber completed called.
2018-12-14 19:31:56.806 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 720.0
2018-12-14 19:31:56.807 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: Tardy subscriber completed called.
*Async Subject*
unsubscribe();
createSubscription();
AsyncSubject<Stock> stockAsyncSubject = AsyncSubject.create();
stockAsyncSubject.onNext(new Stock(GOOG, 722));
// Will get the last value (GOOG, 723) and all future items and terminal events
Subscription subscription = stockAsyncSubject.subscribe(getDefaultSubscriber());
compositeSubscription.add(subscription);
stockAsyncSubject.onNext(new Stock(GOOG, 723));
stockAsyncSubject.onNext(new Stock(GOOG, 100));
stockAsyncSubject.onNext(new Stock(GOOG, 699));
stockAsyncSubject.onCompleted();
Subscription tardySubscription = stockAsyncSubject.subscribe(getTardySubscriber());
compositeSubscription.add(tardySubscription);
==>OUTPUT<==
2018-12-14 19:45:40.991 6482-6482/io.caster.rxexamples D/AsyncSubjectFragment: onNext on default subscriber: GOOG - 699.0
2018-12-14 19:45:40.992 6482-6482/io.caster.rxexamples D/AsyncSubjectFragment: Default subscriber completed called.
2018-12-14 19:45:40.995 6482-6482/io.caster.rxexamples D/AsyncSubjectFragment: onNext on tardy subscriber: GOOG - 699.0
2018-12-14 19:45:40.996 6482-6482/io.caster.rxexamples D/AsyncSubjectFragment: Tardy subscriber completed called.
________________________________________________________________________________
"with error"
unsubscribe();
createSubscription();
AsyncSubject<Stock> stockAsyncSubject = AsyncSubject.create();
stockAsyncSubject.onNext(new Stock(GOOG, 722));
// Will get the last value (GOOG, 723) and all future items and terminal events
Subscription subscription = stockAsyncSubject.subscribe(getDefaultSubscriber());
compositeSubscription.add(subscription);
stockAsyncSubject.onNext(new Stock(GOOG, 723));
stockAsyncSubject.onNext(new Stock(GOOG, 100));
stockAsyncSubject.onNext(new Stock(GOOG, 699));
stockAsyncSubject.onError(new Exception("Boom!")); // current and future subscribers will only receive this, with NO items emitted.
Subscription tardySubscription = stockAsyncSubject.subscribe(getTardySubscriber());
compositeSubscription.add(tardySubscription);
==>OUTPUT<==
2018-12-14 19:47:02.844 6482-6482/io.caster.rxexamples E/AsyncSubjectFragment: Error called on default subscriber.
java.lang.Exception: Boom!
*Publish Subject*
unsubscribe();
createSubscription();
PublishSubject<Stock> publishSubject = PublishSubject.create();
publishSubject.onNext(new Stock(GOOG, 722));
Subscription subscription = publishSubject.subscribe(getDefaultSubscriber());
compositeSubscription.add(subscription);
publishSubject.onNext(new Stock(GOOG, 723));
publishSubject.onNext(new Stock(GOOG, 100));
publishSubject.onNext(new Stock(GOOG, 699));
publishSubject.onCompleted();
Subscription tardySubscription = publishSubject.subscribe(getTardySubscriber());
compositeSubscription.add(tardySubscription);
==>OUTPUT<==
2018-12-15 00:36:41.482 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: onNext on default subscriber: GOOG - 723.0
2018-12-15 00:36:41.484 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: onNext on default subscriber: GOOG - 100.0
2018-12-15 00:36:41.485 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: onNext on default subscriber: GOOG - 699.0
2018-12-15 00:36:41.485 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: Default subscriber completed called.
2018-12-15 00:36:41.486 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: Tardy subscriber completed called.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment