Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@dferrandizmont
Last active January 8, 2019 14:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dferrandizmont/4587d9864b89a91ef78a65e8bd416e8e to your computer and use it in GitHub Desktop.
Save dferrandizmont/4587d9864b89a91ef78a65e8bd416e8e to your computer and use it in GitHub Desktop.
[PART 1.2 Lifetime management] #rxjava #rxjavaguide

Lifetime management

The idea behind Rx is that it is unknown when a sequence emits values or terminates, but you still have control over when you begin and stop accepting values. Subscriptions may be linked to allocated resources that you will want to release at the end of a sequence. Rx provides control over your subscriptions to enable you to do that.

Subscribing

There are several overloads to Observable.subscribe, which are shorthands for the same thing.

Subscription 	subscribe()
Subscription 	subscribe(Action1<? super T> onNext)
Subscription 	subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError)
Subscription 	subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0 onComplete)
Subscription 	subscribe(Observer<? super T> observer)
Subscription 	subscribe(Subscriber<? super T> subscriber)

subscribe() consumes events but performs no actions. The overloads that take one or more Action will construct a Subscriber with the functions that you provide. Where you don't give an action, the event is practically ignored.

In the following example, we handle the error of a sequence that failed.

Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(
	v -> System.out.println(v),
	e -> System.err.println(e));
s.onNext(0);
s.onError(new Exception("Oops"));

Output

0
java.lang.Exception: Oops

If we do not provide a function for error handling, an OnErrorNotImplementedException will be thrown at the point where s.onError is called, which is the producer's side. It happens here that the producer and the consumer are side-by-side, so we could do a traditional try-catch. However, on a compartmentalised system, the producer and the subscriber very often are in different places. Unless the consumer provides a handle for errors to subscribe, they will never know that an error has occured and that the sequence was terminated.

Unsubscribing

You can also stop receiving values before a sequence terminates. Every subscribe overload returns an instance of Subscription, which is an interface with 2 methods:

boolean isUnsubscribed()
void unsubscribe()

Calling unsubscribe will stop events from being pushed to your observer.

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscription subscription = values.subscribe(
    v -> System.out.println(v),
    e -> System.err.println(e),
    () -> System.out.println("Done")
);
values.onNext(0);
values.onNext(1);
subscription.unsubscribe();
values.onNext(2);

Output

0
1

Unsubscribing one observer does not interfere with other observers on the same observable.

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
    v -> System.out.println("First: " + v)
);
Subscription subscription2 = values.subscribe(
	v -> System.out.println("Second: " + v)
);
values.onNext(0);
values.onNext(1);
subscription1.unsubscribe();
System.out.println("Unsubscribed first");
values.onNext(2);

Output

First: 0
Second: 0
First: 1
Second: 1
Unsubscribed first
Second: 2

onError and onCompleted

onError and onCompleted mean the termination of a sequence. An observable that complies with the Rx contract will not emit anything after either of those events. This is something to note both when consuming in Rx and when implementing your own observables.

Subject<Integer, Integer>  values = ReplaySubject.create();
Subscription subscription1 = values.subscribe(
    v -> System.out.println("First: " + v),
    e -> System.out.println("First: " + e),
    () -> System.out.println("Completed")
);
values.onNext(0);
values.onNext(1);
values.onCompleted();
values.onNext(2);

Output

First: 0
First: 1
Completed

Freeing resources

A Subscription is tied to the resources it uses. For that reason, you should remember to dispose of subscriptions. You can create the binding between a Subscription and the necessary resources using the Subscriptions factory.

Subscription s = Subscriptions.create(() -> System.out.println("Clean"));
s.unsubscribe();

Output

Clean

Subscriptions.create takes an action that will be executed on unsubscription to release the resources. There also are shorthand for common actions when creating a sequence.

  • Subscriptions.empty() returns a Subscription that does nothing when disposed. This is useful when you are required to return an instance of Subscription, but your implementation doesn't actually need to release any resources.
  • Subscriptions.from(Subscription... subscriptions) returns a Subscription that will dispose of multiple other subscriptions when it is disposed.
  • Subscriptions.unsubscribed() returns a Subscription that is already disposed of.

There are several implementations of Subscription.

  • BooleanSubscription
  • CompositeSubscription
  • MultipleAssignmentSubscription
  • RefCountSubscription
  • SafeSubscriber
  • Scheduler.Worker
  • SerializedSubscriber
  • SerialSubscription
  • Subscriber
  • TestSubscriber

We will see more of them later in this book. It is interesting to note that Subscriber also implements Subscription. This means that we can also use a reference to a Subscriber to terminate a subscription.

Continue reading

Previous Next
Key types Chapter 2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment