Skip to content

Instantly share code, notes, and snippets.

@naturalwarren
Created December 2, 2018 21:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save naturalwarren/7e5a04befcf253ca5d2facbac6b07113 to your computer and use it in GitHub Desktop.
Save naturalwarren/7e5a04befcf253ca5d2facbac6b07113 to your computer and use it in GitHub Desktop.
A custom CallAdapter.
internal class CoinbaseRxJavaCallAdapter(
private val successBodyType: Type,
private val delegateAdapter: CallAdapter<Any, Any>,
private val errorConverterFactory: Converter<ResponseBody, Any?>,
private val isObservable: Boolean,
private val isFlowable: Boolean,
private val isSingle: Boolean,
private val isMaybe: Boolean
) : CallAdapter<Any, Any> {
override fun adapt(call: Call<Any>): Any {
val stream = delegateAdapter.adapt(call)
@Suppress("UNCHECKED_CAST") // Types are checked with boolean flags.
val observableStream = when {
isObservable -> {
stream as Observable<Any>
}
isFlowable -> {
(stream as Flowable<Any>).toObservable()
}
isSingle -> {
(stream as Single<Any>).toObservable()
}
isMaybe -> {
(stream as Maybe<Any>).toObservable()
}
else -> {
throw IllegalStateException("Unrecognized stream type.")
}
}
val coinbaseStream = observableStream
.map {
CoinbaseResponse<Any, Any>(it, null, null)
}
.onErrorResumeNext(Function<Throwable, Observable<CoinbaseResponse<Any, Any>>> {
throwable ->
when (throwable) {
is IOException -> {
val response: CoinbaseResponse<Any, Any> = CoinbaseResponse(
null,
null,
throwable
)
Observable.just(response)
}
is HttpException -> {
val error = throwable.response().errorBody()
val errorBody = when {
error == null -> null
error.contentLength() == 0L -> null
else -> errorConverterFactory.convert(error)
}
val response: CoinbaseResponse<Any, Any> = CoinbaseResponse(
null,
errorBody,
null
)
Observable.just(response)
}
else -> {
throw IllegalStateException("Unrecognized exception.")
}
}
})
return when {
isObservable -> coinbaseStream
isFlowable -> coinbaseStream.toFlowable(BackpressureStrategy.LATEST)
isSingle -> coinbaseStream.singleOrError()
isMaybe -> coinbaseStream.singleElement()
else -> throw IllegalStateException("Unrecognized stream type.")
}
}
override fun responseType(): Type = successBodyType
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment