Skip to content

Instantly share code, notes, and snippets.

@kiwiandroiddev
Created October 12, 2018 06:52
Show Gist options
  • Save kiwiandroiddev/6ab274f41ffc5d2a850e096f227896a8 to your computer and use it in GitHub Desktop.
Save kiwiandroiddev/6ab274f41ffc5d2a850e096f227896a8 to your computer and use it in GitHub Desktop.
Converts OkHttp's callback interface for WebSockets into an RxJava Observable of events
import io.reactivex.Observable
import okhttp3.*
import okio.ByteString
fun OkHttpClient.newWebSocketObservable(serverUrl: String): Observable<WebSocketEvent> {
val request = Request.Builder().url(serverUrl).build()
return newWebSocketObservable(request)
}
fun OkHttpClient.newWebSocketObservable(request: Request): Observable<WebSocketEvent> {
return Observable.create<WebSocketEvent> { emitter ->
newWebSocket(request, object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
emitter.onNext(WebSocketEvent.OnOpen(webSocket, response))
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
emitter.onNext(WebSocketEvent.OnFailure(webSocket, t, response))
emitter.onComplete()
}
override fun onMessage(webSocket: WebSocket, text: String) {
emitter.onNext(WebSocketEvent.OnTextMessage(webSocket, text))
}
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
emitter.onNext(WebSocketEvent.OnBytesMessage(webSocket, bytes))
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
emitter.onNext(WebSocketEvent.OnClosing(webSocket, code, reason))
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
emitter.onNext(WebSocketEvent.OnClosed(webSocket, code, reason))
emitter.onComplete()
}
})
}
}
sealed class WebSocketEvent(open val webSocket: WebSocket) {
data class OnOpen(override val webSocket: WebSocket,
val response: Response) : WebSocketEvent(webSocket)
data class OnFailure(override val webSocket: WebSocket,
val t: Throwable,
val response: Response?) : WebSocketEvent(webSocket)
data class OnTextMessage(override val webSocket: WebSocket,
val text: String) : WebSocketEvent(webSocket)
data class OnBytesMessage(override val webSocket: WebSocket,
val bytes: ByteString) : WebSocketEvent(webSocket)
data class OnClosing(override val webSocket: WebSocket,
val code: Int,
val reason: String) : WebSocketEvent(webSocket)
data class OnClosed(override val webSocket: WebSocket,
val code: Int,
val reason: String) : WebSocketEvent(webSocket)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment