Skip to content

Instantly share code, notes, and snippets.

@rfermontero
Created March 4, 2020 13:37
Show Gist options
  • Save rfermontero/0381e4d3d655be5d935fcde2a5b80b94 to your computer and use it in GitHub Desktop.
Save rfermontero/0381e4d3d655be5d935fcde2a5b80b94 to your computer and use it in GitHub Desktop.
val assistantResponses: Observable<Either<AssistantError, AssistantResponse>>
get() {
Logger.i(TAG, "getAssistantResponses")
val activityObservable = readyObservable
.observeOn(Schedulers.io())
.flatMap<Either<Throwable, Conversation>> { startConversation() }
.flatMap<Either<SocketEvent, ActivitySet>> { connectConversation(it) }
.filter { socketNotOpened(it) }
.map<Either<AssistantError, ActivitySet>> { mapSocketEventToDomainModel(it) }
.flatMapIterable<Either<AssistantError, Pair<Activity, String>>> { flatMapIterable(it) }
.map<Either<AssistantError, Pair<Activity, String>>> { updateLastRequestId(it) }
.filter { theirActivities(it) }
.filter { notCanceledRequests(it) }
.map<Either<AssistantError, Activity>> { updateWatermark(it) }
.filter { onlyMessages(it) }
.map<Either<AssistantError, Activity>> { updateDialogContext(it) }
.map<Either<AssistantError, Activity>> { extractErrorFromChannelDataOrReturn(it) }
return RxCustomOperators
.bufferUntil(activityObservable) { getErrorOrHasNotMoreMessages(it) }
.map { setResponseType(it) }
.doOnNext { resetLastRequestId(it.second) }
.map<Either<AssistantError, AssistantResponse>> { firstErrorOrAssistantResponse(it) }
.flatMap<Either<AssistantError, AssistantResponse>> { prefetchResourcesOrError(it) }
.doOnEach { clearTimeout() }
.doOnNext { setLastResponse(it) }
.mergeWith(responseTimeoutObservable())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment