Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
websocket.org rx implementation
object WebSocket : WebSocketListener() {
private lateinit var broadcaster: FlowableEmitter<String>
private lateinit var disposable: Disposable
private lateinit var webSocket: WebSocket
private val flowable = Flowable.create(FlowableOnSubscribe<String> { emitter -> broadcaster = emitter }, BackpressureStrategy.BUFFER)
fun subscribe(): Flowable<String> {
val client = OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS)
.build()
val request = Request.Builder()
.url("ws://echo.websocket.org")
.build()
webSocket = client.newWebSocket(request, this)
client.dispatcher().executorService().shutdown()
disposable = flowable.subscribe()
return flowable
}
fun send(message: String) = webSocket.send(message)
override fun onOpen(webSocket: WebSocket, response: Response) {
super.onOpen(webSocket, response)
broadcaster.onNext("onOpen")
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
super.onClosed(webSocket, code, reason)
broadcaster.onNext("Connection closed")
disposable.dispose()
}
override fun onMessage(webSocket: WebSocket, text: String) {
super.onMessage(webSocket, text)
broadcaster.onNext(text)
}
override fun onFailure(webSocket: WebSocket, throwable: Throwable, response: Response?) {
super.onFailure(webSocket, throwable, response)
broadcaster.onError(throwable)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.