The idea behind Rx is that it is unknown when a sequence emits values or terminates, but you still have control over when you begin and stop accepting values. Subscriptions may be linked to allocated resources that you will want to release at the end of a sequence. Rx provides control over your subscriptions to enable you to do that.
There are several overloads to Observable.subscribe
, which are shorthands for the same thing.
Subscription subscribe()
Subscription subscribe(Action1<? super T> onNext)
Subscription subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError)
Subscription subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0 onComplete)
Subscription subscribe(Observer<? super T> observer)
Subscription subscribe(Subscriber<? super T> subscriber)
subscribe()
consumes events but performs no actions. The overloads that take one or more Action
will construct a Subscriber
with the functions that you provide. Where you don't give an action, the event is practically ignored.
In the following example, we handle the error of a sequence that failed.
Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(
v -> System.out.println(v),
e -> System.err.println(e));
s.onNext(0);
s.onError(new Exception("Oops"));
Output
0
java.lang.Exception: Oops
If we do not provide a function for error handling, an OnErrorNotImplementedException
will be thrown at the point where s.onError
is called, which is the producer's side. It happens here that the producer and the consumer are side-by-side, so we could do a traditional try-catch. However, on a compartmentalised system, the producer and the subscriber very often are in different places. Unless the consumer provides a handle for errors to subscribe
, they will never know that an error has occured and that the sequence was terminated.
You can also stop receiving values before a sequence terminates. Every subscribe
overload returns an instance of Subscription
, which is an interface with 2 methods:
boolean isUnsubscribed()
void unsubscribe()
Calling unsubscribe
will stop events from being pushed to your observer.
Subject<Integer, Integer> values = ReplaySubject.create();
Subscription subscription = values.subscribe(
v -> System.out.println(v),
e -> System.err.println(e),
() -> System.out.println("Done")
);
values.onNext(0);
values.onNext(1);
subscription.unsubscribe();
values.onNext(2);
0
1
Unsubscribing one observer does not interfere with other observers on the same observable.
Subject<Integer, Integer> values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
v -> System.out.println("First: " + v)
);
Subscription subscription2 = values.subscribe(
v -> System.out.println("Second: " + v)
);
values.onNext(0);
values.onNext(1);
subscription1.unsubscribe();
System.out.println("Unsubscribed first");
values.onNext(2);
First: 0
Second: 0
First: 1
Second: 1
Unsubscribed first
Second: 2
onError
and onCompleted
mean the termination of a sequence. An observable that complies with the Rx contract will not emit anything after either of those events. This is something to note both when consuming in Rx and when implementing your own observables.
Subject<Integer, Integer> values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
v -> System.out.println("First: " + v),
e -> System.out.println("First: " + e),
() -> System.out.println("Completed")
);
values.onNext(0);
values.onNext(1);
values.onCompleted();
values.onNext(2);
First: 0
First: 1
Completed
A Subscription
is tied to the resources it uses. For that reason, you should remember to dispose of subscriptions. You can create the binding between a Subscription
and the necessary resources using the Subscriptions factory.
Subscription s = Subscriptions.create(() -> System.out.println("Clean"));
s.unsubscribe();
Clean
Subscriptions.create
takes an action that will be executed on unsubscription to release the resources. There also are shorthand for common actions when creating a sequence.
Subscriptions.empty()
returns aSubscription
that does nothing when disposed. This is useful when you are required to return an instance ofSubscription
, but your implementation doesn't actually need to release any resources.Subscriptions.from(Subscription... subscriptions)
returns aSubscription
that will dispose of multiple other subscriptions when it is disposed.Subscriptions.unsubscribed()
returns aSubscription
that is already disposed of.
There are several implementations of Subscription
.
BooleanSubscription
CompositeSubscription
MultipleAssignmentSubscription
RefCountSubscription
SafeSubscriber
Scheduler.Worker
SerializedSubscriber
SerialSubscription
Subscriber
TestSubscriber
We will see more of them later in this book. It is interesting to note that Subscriber
also implements Subscription
. This means that we can also use a reference to a Subscriber
to terminate a subscription.
Previous | Next |
---|---|
Key types | Chapter 2 |