Skip to content

Instantly share code, notes, and snippets.

@hleinone
Created February 6, 2019 09:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hleinone/a84085b34252be02b96d1e8d5e912679 to your computer and use it in GitHub Desktop.
Save hleinone/a84085b34252be02b96d1e8d5e912679 to your computer and use it in GitHub Desktop.
Android SocketIO Rx extension
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.disposables.Disposables
import io.socket.client.IO
import io.socket.client.Socket
import io.socket.emitter.Emitter
class RxIO(uri: String) {
private val socket = IO.socket(uri)
@androidx.annotation.CheckResult
fun connect(): Single<RxSocket> = doConnect().doOnDispose {
socket.disconnect()
}
@androidx.annotation.CheckResult
private fun doConnect(): Single<RxSocket> = Single.create { emitter ->
socket.on(Socket.EVENT_CONNECT) {
emitter.onSuccess(RxSocket(socket))
}
socket.connect()
}
}
class RxSocket(private val socket: Socket) {
@androidx.annotation.CheckResult
fun on(event: String): Observable<Array<Any>> = Observable.create { emitter ->
val listener = Emitter.Listener {
emitter.onNext(it)
}
socket.on(event, listener)
emitter.setDisposable(Disposables.fromRunnable {
socket.off(event, listener)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment