Skip to content

Instantly share code, notes, and snippets.

@FireZenk
Created January 11, 2018 14:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FireZenk/716f5bf4952bc1f11fceadccba2dc31c to your computer and use it in GitHub Desktop.
Save FireZenk/716f5bf4952bc1f11fceadccba2dc31c to your computer and use it in GitHub Desktop.
websocket.org rx implementation
object WebSocket : WebSocketListener() {
private lateinit var broadcaster: FlowableEmitter<String>
private lateinit var disposable: Disposable
private lateinit var webSocket: WebSocket
private val flowable = Flowable.create(FlowableOnSubscribe<String> { emitter -> broadcaster = emitter }, BackpressureStrategy.BUFFER)
fun subscribe(): Flowable<String> {
val client = OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS)
.build()
val request = Request.Builder()
.url("ws://echo.websocket.org")
.build()
webSocket = client.newWebSocket(request, this)
client.dispatcher().executorService().shutdown()
disposable = flowable.subscribe()
return flowable
}
fun send(message: String) = webSocket.send(message)
override fun onOpen(webSocket: WebSocket, response: Response) {
super.onOpen(webSocket, response)
broadcaster.onNext("onOpen")
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
super.onClosed(webSocket, code, reason)
broadcaster.onNext("Connection closed")
disposable.dispose()
}
override fun onMessage(webSocket: WebSocket, text: String) {
super.onMessage(webSocket, text)
broadcaster.onNext(text)
}
override fun onFailure(webSocket: WebSocket, throwable: Throwable, response: Response?) {
super.onFailure(webSocket, throwable, response)
broadcaster.onError(throwable)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment