Skip to content

Instantly share code, notes, and snippets.

@oxlade39
Last active December 17, 2015 19:39
Show Gist options
  • Save oxlade39/5661598 to your computer and use it in GitHub Desktop.
Save oxlade39/5661598 to your computer and use it in GitHub Desktop.
private Observable<EsperEvent> sendDisconnectOnUnsubscribe(final Observable<EsperEvent> responseForThisRequest,
final CreateEventStreamRequest createEventStreamRequest) {
return Observable.create(new Func1<Observer<EsperEvent>, Subscription>() {
@Override
public Subscription call(final Observer<EsperEvent> esperEventObserver) {
final Subscription wrapped = responseForThisRequest.subscribe(new Action1<EsperEvent>() {
@Override
public void call(EsperEvent esperEvent) {
esperEventObserver.onNext(esperEvent);
}
});
return Subscriptions.create(wrapped, sendUnsubscribeMessage());
}
private Subscription sendUnsubscribeMessage() {
return new Subscription() {
@Override
public void unsubscribe() {
final String requestId = createEventStreamRequest.getId();
final EndEventStreamRequest disconnectEvent = new EndEventStreamRequest(requestId);
endRequestSender.sendBodyAndHeader(disconnectEvent, Exchange.CORRELATION_ID, requestId);
}
};
}
});
}
@oxlade39
Copy link
Author

Ok thanks, I had found the CompositeSubscription , I've updated the gist accordingly.

I guess this is an infinite sequence, as the termination of the sequence only occurs when the client un-subscribes.

I think maybe I'll call sendUnsubscribeMessage from onError and onCompleted as well then just to be correct.

Thanks for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment