Last active
December 17, 2015 19:39
-
-
Save oxlade39/5661598 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private Observable<EsperEvent> sendDisconnectOnUnsubscribe(final Observable<EsperEvent> responseForThisRequest, | |
final CreateEventStreamRequest createEventStreamRequest) { | |
return Observable.create(new Func1<Observer<EsperEvent>, Subscription>() { | |
@Override | |
public Subscription call(final Observer<EsperEvent> esperEventObserver) { | |
final Subscription wrapped = responseForThisRequest.subscribe(new Action1<EsperEvent>() { | |
@Override | |
public void call(EsperEvent esperEvent) { | |
esperEventObserver.onNext(esperEvent); | |
} | |
}); | |
return Subscriptions.create(wrapped, sendUnsubscribeMessage()); | |
} | |
private Subscription sendUnsubscribeMessage() { | |
return new Subscription() { | |
@Override | |
public void unsubscribe() { | |
final String requestId = createEventStreamRequest.getId(); | |
final EndEventStreamRequest disconnectEvent = new EndEventStreamRequest(requestId); | |
endRequestSender.sendBodyAndHeader(disconnectEvent, Exchange.CORRELATION_ID, requestId); | |
} | |
}; | |
} | |
}); | |
} |
Ok thanks, I had found the CompositeSubscription
, I've updated the gist accordingly.
I guess this is an infinite sequence, as the termination of the sequence only occurs when the client un-subscribes.
I think maybe I'll call sendUnsubscribeMessage
from onError
and onCompleted
as well then just to be correct.
Thanks for your help.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Everything is composable so you can return a
Subscription
implementation that does whatever you want, as you are doing here.The only thing to be aware of is that
unsubscribe
is not guaranteed to occur whereas one ofonError
oronCompleted
is.If you want this to occur only on
unsubscribe
then it's the correct solution. If you want it to always occur then you may want to use thefinallyDo
operator instead as it acts like a try/finally and will get invoked after completion regardless of how it completes. If this is an infinite sequence thenunsubscribe
may very well be the right place, though it would not be trigged in event of an error from the infinite sequence.As far as an existing helper, for your use case it seems you are probably pursuing the correct approach. You could use a
CompositeSubscription
if you wanted (http://netflix.github.io/RxJava/javadoc/rx/subscriptions/CompositeSubscription.html) and have 2Subscriptions
combined into one, one of them being the real one, another being your custom logic. The decorator pattern you're pursuing though feels more correct for what you're doing.