Skip to content

Instantly share code, notes, and snippets.

@baconpat
Created August 1, 2015 13:23
Show Gist options
  • Save baconpat/6140e886fa30926b0310 to your computer and use it in GitHub Desktop.
Save baconpat/6140e886fa30926b0310 to your computer and use it in GitHub Desktop.
public class UnsubscribingMidStreamTest {
public static <T> Observable.Transformer<T, T> takeNextAndUnsubscribe() {
return observable -> {
BehaviorSubject<T> subject = BehaviorSubject.create();
Observable<T> source = observable.doOnNext(value -> subject.onNext(value));
return Observable
.merge(source.takeUntil(subject), subject)
.take(1);
};
}
private Observable<Integer> doMoreWork(Integer input) {
return Observable.defer(() -> {
System.out.println("Doing more work, input: " + input);
return Observable.just(input + 50);
});
}
@Test
public void test() throws Exception {
Observable<Void> acquireResource = Observable.defer(() -> {
MyResource resource = MyResource.acquire();
System.out.println("Acquired the resource");
return Observable
.<Void>never()
.doOnUnsubscribe(() -> {
MyResource.release(resource);
System.out.println("Released the resource");
});
});
Observable<Integer> doWork = Observable.defer(() -> {
System.out.println("Doing work");
return Observable.just(100);
});
Observable<Integer> doWorkWhileAcquired =
Observable.merge(acquireResource.cast(Integer.class), doWork);
Observable<Integer> combinedOperations = doWorkWhileAcquired
.compose(takeNextAndUnsubscribe())
.flatMap(value -> acquireResource
.cast(Integer.class)
.mergeWith(doMoreWork(value)))
.take(1);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
combinedOperations.subscribe(subscriber);
subscriber.assertValue(150);
subscriber.assertCompleted();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment