Skip to content

Instantly share code, notes, and snippets.

@eirikb
Created February 7, 2020 11:14
Show Gist options
  • Save eirikb/41910638c2acef9f8f84c43f507f6023 to your computer and use it in GitHub Desktop.
Save eirikb/41910638c2acef9f8f84c43f507f6023 to your computer and use it in GitHub Desktop.
import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import io.ktor.client.HttpClient
import io.ktor.client.engine.okhttp.OkHttp
import io.ktor.client.features.websocket.DefaultClientWebSocketSession
import io.ktor.client.features.websocket.WebSockets
import io.ktor.client.features.websocket.webSocketSession
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.readText
import io.ktor.http.cio.websocket.send
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import java.math.BigInteger
private data class Target(
val type: String,
val id: String,
val host: String,
val component: String,
val proto: String,
val path: String
)
private data class Results(
val stamp: BigInteger,
val data: Map<String, Any>
)
data class Result(
val stamp: BigInteger,
val value: Any
)
class WsProxyClient(private val proxyHost: String) {
private lateinit var session: DefaultClientWebSocketSession
private var nextId = 0
private val callbackMap: HashMap<String, suspend (Result) -> Unit> = hashMapOf()
private val objectMapper = jacksonObjectMapper().apply {
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
}
private suspend fun subscribe(target: Target) {
val asJson = objectMapper.writeValueAsString(target)
// Don't want this, but don't know how to wait for session to be established
delay(1000)
session.send(asJson)
}
@FlowPreview
@KtorExperimentalAPI
suspend fun start() {
val client = HttpClient(OkHttp) {
install(WebSockets)
}
session = client.webSocketSession(host = proxyHost)
while (true) {
val text = (session.incoming.receive() as Frame.Text).readText()
val result = objectMapper.readValue<Results>(text)
result.data.forEach { (key, value) ->
callbackMap[key]?.invoke(Result(result.stamp, value))
}
}
}
@ExperimentalCoroutinesApi
@KtorExperimentalAPI
fun subscribe(host: String, component: String, proto: String, path: String): Flow<Result> {
nextId++
val id = "$nextId"
return callbackFlow {
callbackMap[id] = {
send(it)
}
subscribe(Target("subscribe", id, host, component, proto, path))
awaitClose()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment