Skip to content

Instantly share code, notes, and snippets.

@oxlade39
Last active December 17, 2015 19:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oxlade39/5661598 to your computer and use it in GitHub Desktop.
Save oxlade39/5661598 to your computer and use it in GitHub Desktop.
private Observable<EsperEvent> sendDisconnectOnUnsubscribe(final Observable<EsperEvent> responseForThisRequest,
final CreateEventStreamRequest createEventStreamRequest) {
return Observable.create(new Func1<Observer<EsperEvent>, Subscription>() {
@Override
public Subscription call(final Observer<EsperEvent> esperEventObserver) {
final Subscription wrapped = responseForThisRequest.subscribe(new Action1<EsperEvent>() {
@Override
public void call(EsperEvent esperEvent) {
esperEventObserver.onNext(esperEvent);
}
});
return Subscriptions.create(wrapped, sendUnsubscribeMessage());
}
private Subscription sendUnsubscribeMessage() {
return new Subscription() {
@Override
public void unsubscribe() {
final String requestId = createEventStreamRequest.getId();
final EndEventStreamRequest disconnectEvent = new EndEventStreamRequest(requestId);
endRequestSender.sendBodyAndHeader(disconnectEvent, Exchange.CORRELATION_ID, requestId);
}
};
}
});
}
@oxlade39
Copy link
Author

I want to perform some custom callback when a client subscribes to my existing observable.
I can't think how to achieve other than the above but I'm sure I'm missing some existing helper.????

Effectively I just want to return an existing Ovservable with a decorated Subscription

@benjchristensen
Copy link

Everything is composable so you can return a Subscription implementation that does whatever you want, as you are doing here.

The only thing to be aware of is that unsubscribe is not guaranteed to occur whereas one of onError or onCompleted is.

If you want this to occur only on unsubscribe then it's the correct solution. If you want it to always occur then you may want to use the finallyDo operator instead as it acts like a try/finally and will get invoked after completion regardless of how it completes. If this is an infinite sequence then unsubscribe may very well be the right place, though it would not be trigged in event of an error from the infinite sequence.

As far as an existing helper, for your use case it seems you are probably pursuing the correct approach. You could use a CompositeSubscription if you wanted (http://netflix.github.io/RxJava/javadoc/rx/subscriptions/CompositeSubscription.html) and have 2 Subscriptions combined into one, one of them being the real one, another being your custom logic. The decorator pattern you're pursuing though feels more correct for what you're doing.

@oxlade39
Copy link
Author

Ok thanks, I had found the CompositeSubscription , I've updated the gist accordingly.

I guess this is an infinite sequence, as the termination of the sequence only occurs when the client un-subscribes.

I think maybe I'll call sendUnsubscribeMessage from onError and onCompleted as well then just to be correct.

Thanks for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment