Last active
December 17, 2015 19:39
-
-
Save oxlade39/5661598 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private Observable<EsperEvent> sendDisconnectOnUnsubscribe(final Observable<EsperEvent> responseForThisRequest, | |
final CreateEventStreamRequest createEventStreamRequest) { | |
return Observable.create(new Func1<Observer<EsperEvent>, Subscription>() { | |
@Override | |
public Subscription call(final Observer<EsperEvent> esperEventObserver) { | |
final Subscription wrapped = responseForThisRequest.subscribe(new Action1<EsperEvent>() { | |
@Override | |
public void call(EsperEvent esperEvent) { | |
esperEventObserver.onNext(esperEvent); | |
} | |
}); | |
return Subscriptions.create(wrapped, sendUnsubscribeMessage()); | |
} | |
private Subscription sendUnsubscribeMessage() { | |
return new Subscription() { | |
@Override | |
public void unsubscribe() { | |
final String requestId = createEventStreamRequest.getId(); | |
final EndEventStreamRequest disconnectEvent = new EndEventStreamRequest(requestId); | |
endRequestSender.sendBodyAndHeader(disconnectEvent, Exchange.CORRELATION_ID, requestId); | |
} | |
}; | |
} | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Ok thanks, I had found the
CompositeSubscription
, I've updated the gist accordingly.I guess this is an infinite sequence, as the termination of the sequence only occurs when the client un-subscribes.
I think maybe I'll call
sendUnsubscribeMessage
fromonError
andonCompleted
as well then just to be correct.Thanks for your help.