Skip to content

Instantly share code, notes, and snippets.

@vamjakuldip
Created May 30, 2020 09:44
Show Gist options
  • Save vamjakuldip/eb0d0cb4ecdb82954706fe93e9ec3377 to your computer and use it in GitHub Desktop.
Save vamjakuldip/eb0d0cb4ecdb82954706fe93e9ec3377 to your computer and use it in GitHub Desktop.
RxWebsocket used for connect to websocket, its used retrofit libraby to connect websocket and handle try connection of websocket.
package com.websocket
import okhttp3.OkHttpClient
class Config private constructor() {
var client = OkHttpClient()
class Builder {
private val config: Config = Config()
fun setClient(client: OkHttpClient): Builder {
config.client = client
return this
}
fun build(): Config {
return config
}
}
}
package com.websocket
import io.reactivex.Observable
import okio.ByteString
import java.util.concurrent.TimeUnit
object RxWebSocket {
fun setConfig(config: Config) {
val instance = RxWebSocketUtil.getInstance()
instance.setClient(config.client)
}
operator fun get(url: String): Observable<WebSocketInfo> {
return RxWebSocketUtil.getInstance().getWebSocketInfo(url)
}
operator fun get(url: String, timeout: Long, timeUnit: TimeUnit?): Observable<WebSocketInfo> {
return RxWebSocketUtil.getInstance().getWebSocketInfo(url, timeout, timeUnit)
}
fun send(url: String, msg: String) {
RxWebSocketUtil.getInstance().send(url, msg)
}
fun send(url: String, byteString: ByteString) {
RxWebSocketUtil.getInstance().send(url, byteString)
}
}
package com.websocket
import io.reactivex.Observable
import io.reactivex.ObservableEmitter
import io.reactivex.ObservableOnSubscribe
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.schedulers.Schedulers
import okhttp3.*
import okio.ByteString
import retrofit2.HttpException
import java.io.IOException
import java.io.InterruptedIOException
import java.net.SocketTimeoutException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
class RxWebSocketUtil private constructor() {
private var client: OkHttpClient
private val observableMap: MutableMap<String, Observable<WebSocketInfo>?>
private val webSocketMap: MutableMap<String, WebSocket>
private var TAG: String = this.javaClass.simpleName
/**
* set your client
*
* @param client
*/
fun setClient(client: OkHttpClient?) {
if (client == null) {
throw NullPointerException(" Are you kidding me ? client == null")
}
this.client = client
}
fun getWebSocketInfo(url: String, timeout: Long, timeUnit: TimeUnit?): Observable<WebSocketInfo> {
var observable = observableMap[url]
if (observable == null) {
observable = Observable.create(WebSocketOnSubscribe(url))
.timeout(timeout, timeUnit)
.retryWhen { retryHandler(it) }
.retry { throwable: Throwable -> throwable is IOException || throwable is TimeoutException }
.doOnDispose {
observableMap.remove(url)
webSocketMap.remove(url)
}
.doOnNext { webSocketInfo: WebSocketInfo ->
if (webSocketInfo.isOnOpen) {
webSocketMap[url] = webSocketInfo.webSocket!!
}
}
.share()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
observableMap[url] = observable
} else {
val webSocket = webSocketMap[url]
if (webSocket != null) {
observable = observable.startWith(WebSocketInfo(webSocket, true))
}
}
return observable!!.observeOn(AndroidSchedulers.mainThread())
}
fun getWebSocketInfo(url: String): Observable<WebSocketInfo> {
return getWebSocketInfo(url, 30, TimeUnit.MINUTES)
}
fun getWebSocket(url: String): Observable<WebSocket> {
return getWebSocketInfo(url)
.filter { webSocketInfo: WebSocketInfo -> webSocketInfo.webSocket != null }
.map { obj: WebSocketInfo -> obj.webSocket }
}
fun send(url: String, msg: String) {
val webSocket = webSocketMap[url]
webSocket?.send(msg)
}
fun send(url: String, byteString: ByteString) {
val webSocket = webSocketMap[url]
webSocket?.send(byteString)
}
private fun getRequest(url: String): Request {
return Request.Builder().get().url(url).build()
}
private inner class WebSocketOnSubscribe(private val url: String) : ObservableOnSubscribe<WebSocketInfo> {
private var webSocket: WebSocket? = null
@Throws(Exception::class)
override fun subscribe(emitter: ObservableEmitter<WebSocketInfo?>) {
if (webSocket != null) {
if ("main" != Thread.currentThread().name) {
emitter.onNext(WebSocketInfo.createReconnect())
}
}
initWebSocket(emitter)
}
private fun initWebSocket(emitter: ObservableEmitter<WebSocketInfo?>) {
webSocket = client.newWebSocket(getRequest(url), object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
webSocketMap[url] = webSocket
if (!emitter.isDisposed) {
emitter.onNext(WebSocketInfo(webSocket, true))
}
}
override fun onMessage(webSocket: WebSocket, text: String) {
if (!emitter.isDisposed) {
emitter.onNext(WebSocketInfo(webSocket, text))
}
}
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
if (!emitter.isDisposed) {
emitter.onNext(WebSocketInfo(webSocket, bytes))
}
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
if (!emitter.isDisposed) {
emitter.onError(t)
}
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
webSocket.close(1000, null)
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
}
})
emitter.setCancellable {
webSocket?.close(3000, "close WebSocket")
}
}
}
companion object {
var rxWebSocketUtil: RxWebSocketUtil? = null
fun getInstance(): RxWebSocketUtil {
if (rxWebSocketUtil == null) {
synchronized(RxWebSocketUtil::class.java) {
if (rxWebSocketUtil == null) {
rxWebSocketUtil = RxWebSocketUtil()
}
}
}
return rxWebSocketUtil!!
}
}
fun retryHandler(throwable: Observable<Throwable>): Observable<Any> {
return throwable.flatMap { error ->
if (error is SocketTimeoutException) {
return@flatMap Observable.timer(5, TimeUnit.SECONDS)
} else if (error is InterruptedIOException) {
return@flatMap Observable.timer(5, TimeUnit.SECONDS)
} else if (error is IOException) {
return@flatMap Observable.timer(5, TimeUnit.SECONDS)
} else if (error is HttpException) {
val httpException = error as HttpException
if (httpException.code() == 502) {
return@flatMap Observable.timer(5, TimeUnit.SECONDS)
}
}
return@flatMap Observable.error<Any>(error)
}
}
init {
observableMap = ConcurrentHashMap()
webSocketMap = ConcurrentHashMap()
client = OkHttpClient()
}
}
package com.websocket
import okhttp3.WebSocket
import okio.ByteString
class WebSocketInfo {
var webSocket: WebSocket? = null
var string: String? = null
var byteString: ByteString? = null
var isOnOpen = false
private set
var isOnReconnect = false
private set
private constructor() {}
internal constructor(webSocket: WebSocket?, onOpen: Boolean) {
this.webSocket = webSocket
isOnOpen = onOpen
}
internal constructor(webSocket: WebSocket?, mString: String?) {
this.webSocket = webSocket
string = mString
}
internal constructor(webSocket: WebSocket?, byteString: ByteString?) {
this.webSocket = webSocket
this.byteString = byteString
}
companion object {
fun createReconnect(): WebSocketInfo {
val socketInfo = WebSocketInfo()
socketInfo.isOnReconnect = true
return socketInfo
}
}
}
package com.websocket
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import okhttp3.WebSocket
import okio.ByteString
abstract class WebSocketSubscriber : Observer<WebSocketInfo> {
private var hasOpened = false
protected var disposable: Disposable? = null
override fun onNext(webSocketInfo: WebSocketInfo) {
if (webSocketInfo.isOnOpen) {
hasOpened = true
onOpen(webSocketInfo.webSocket!!)
} else if (webSocketInfo.string != null) {
onMessage(webSocketInfo.string!!)
} else if (webSocketInfo.byteString != null) {
onMessage(webSocketInfo.byteString!!)
} else if (webSocketInfo.isOnReconnect) {
onReconnect()
}
}
/**
* Callback when the WebSocket is opened
*
* @param webSocket
*/
protected open fun onOpen(webSocket: WebSocket) {}
protected open fun onMessage(text: String) {}
protected fun onMessage(byteString: ByteString) {}
/**
* Callback when the WebSocket is reconnecting
*/
protected fun onReconnect() {}
protected open fun onClose() {}
override fun onSubscribe(disposable: Disposable) {
this.disposable = disposable
}
fun dispose() {
disposable?.dispose()
}
override fun onComplete() {
if (hasOpened) {
onClose()
}
}
override fun onError(e: Throwable) {
e.printStackTrace()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment