Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Signalr socket rx implementation
object SignalrSocket {
private lateinit var broadcaster: FlowableEmitter<String>
private lateinit var disposable: Disposable
private val flowable = Flowable.create(FlowableOnSubscribe<String> { emitter -> broadcaster = emitter }, BackpressureStrategy.BUFFER)
private lateinit var mHubConnection: HubConnection
private lateinit var mHubProxy: HubProxy
@Throws(InterruptedException::class, ExecutionException::class)
fun subscribe(): Flowable<String> {
Platform.loadPlatformComponent(object: PlatformComponent {
override fun createHttpConnection(logger: Logger) = Platform.createHttpConnection(logger)
override fun getOSName() = "android"
})
val credentials = Credentials { request -> request.addHeader("User-Name", "BNK") }
mHubConnection = HubConnection("http://192.168.1.100")
mHubConnection.credentials = credentials
mHubProxy = mHubConnection.createHubProxy("ChatHub")
val clientTransport = ServerSentEventsTransport(mHubConnection.logger)
val signalRFuture = mHubConnection.start(clientTransport)
try {
signalRFuture.get()
} catch (e : Exception) { println("Fail!") }
disposable = flowable.subscribe()
// TEST MESSAGE
send("Hello from Android!")
mHubProxy.on("broadcastMessage", { broadcaster.onNext(it) }, String::class.java)
return flowable
}
fun send(message: String) = mHubProxy.invoke("Send", message)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.