Skip to content

Instantly share code, notes, and snippets.

What would you like to do? rx implementation
object WebSocket : WebSocketListener() {
private lateinit var broadcaster: FlowableEmitter<String>
private lateinit var disposable: Disposable
private lateinit var webSocket: WebSocket
private val flowable = Flowable.create(FlowableOnSubscribe<String> { emitter -> broadcaster = emitter }, BackpressureStrategy.BUFFER)
fun subscribe(): Flowable<String> {
val client = OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS)
val request = Request.Builder()
webSocket = client.newWebSocket(request, this)
disposable = flowable.subscribe()
return flowable
fun send(message: String) = webSocket.send(message)
override fun onOpen(webSocket: WebSocket, response: Response) {
super.onOpen(webSocket, response)
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
super.onClosed(webSocket, code, reason)
broadcaster.onNext("Connection closed")
override fun onMessage(webSocket: WebSocket, text: String) {
super.onMessage(webSocket, text)
override fun onFailure(webSocket: WebSocket, throwable: Throwable, response: Response?) {
super.onFailure(webSocket, throwable, response)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.