Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Signalr socket rx implementation
object SignalrSocket {
private lateinit var broadcaster: FlowableEmitter<String>
private lateinit var disposable: Disposable
private val flowable = Flowable.create(FlowableOnSubscribe<String> { emitter -> broadcaster = emitter }, BackpressureStrategy.BUFFER)
private lateinit var mHubConnection: HubConnection
private lateinit var mHubProxy: HubProxy
@Throws(InterruptedException::class, ExecutionException::class)
fun subscribe(): Flowable<String> {
Platform.loadPlatformComponent(object: PlatformComponent {
override fun createHttpConnection(logger: Logger) = Platform.createHttpConnection(logger)
override fun getOSName() = "android"
val credentials = Credentials { request -> request.addHeader("User-Name", "BNK") }
mHubConnection = HubConnection("")
mHubConnection.credentials = credentials
mHubProxy = mHubConnection.createHubProxy("ChatHub")
val clientTransport = ServerSentEventsTransport(mHubConnection.logger)
val signalRFuture = mHubConnection.start(clientTransport)
try {
} catch (e : Exception) { println("Fail!") }
disposable = flowable.subscribe()
send("Hello from Android!")
mHubProxy.on("broadcastMessage", { broadcaster.onNext(it) },
return flowable
fun send(message: String) = mHubProxy.invoke("Send", message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.