Skip to content

Instantly share code, notes, and snippets.

@raquo
Created September 12, 2020 21:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save raquo/81f0cc2c67fc01afa5d681ad4a4c0457 to your computer and use it in GitHub Desktop.
Save raquo/81f0cc2c67fc01afa5d681ad4a4c0457 to your computer and use it in GitHub Desktop.
package com.raquo.ws
import com.raquo.airstream.core.Observer
import com.raquo.airstream.eventbus.EventBus
import com.raquo.airstream.eventstream.EventStream
import org.scalajs.dom
import org.scalajs.dom.WebSocket
import upickle.default._
import scala.scalajs.js
// @TODO[Prod] We should test this for resilience to network failures / computer sleeps / inactivity / etc.
// @TODO[Security] Websockets need to be protected against CSRF (they are not covered by CORS)
class WebsocketClient[ClientMsg, ServerMsg](
urlPath: String,
clientMsgRw: ReadWriter[ClientMsg],
serverMsgRw: ReadWriter[ServerMsg]
) {
import WebsocketClient._
// @TODO[Prod]
private val socketUrl = s"ws://.../$urlPath"
private val serverMessageBus = new EventBus[ServerMsg]
private val clientMessageQueue = js.Array[ClientMsg]()
/** case None => Websocket connection NOT NEEDED right now, so there's no websocket.
* case Some(ws) => Websocket connection REQUIRED, and it is either already open or some
* other code is currently trying to (re)open it (don't interfere, trust
* the reconnection code to do its job)
*/
private var maybeWs: Option[WebSocket] = Some(createWebsocket())
/** Subscribe to this stream to receive websocket events from the server */
val $message: EventStream[ServerMsg] = serverMessageBus.events.map { message =>
// dom.console.log(":WS received vvv")
// dom.console.log(message.asInstanceOf[js.Any])
message
}
/** Send requests to this observer to send them over the websocket */
val writer: Observer[ClientMsg] = Observer { clientMsg =>
clientMessageQueue.push(clientMsg)
maybeWs match {
case Some(ws) =>
flushRequestQueue(ws)
case None =>
maybeWs = Some(createWebsocket())
}
}
def sendTrackedRequest[Response <: TrackedWebsocketResponse with ServerMsg](
request: ClientMsg with TrackedWebsocketRequest[Response]
): EventStream[Response] = {
// @TODO[Performance] Unsubscribe after getting response
writer.onNext(request)
$message.collect {
case response: TrackedWebsocketResponse if response.sourceRequestId == request.requestId =>
response.asInstanceOf[Response]
}
}
private def flushRequestQueue(ws: WebSocket): Unit = {
if (ws.readyState == ReadyState.Open) {
// println(s"> flushing query queue (${queryQueue.length} items), ws status: ${ws.readyState}")
while (clientMessageQueue.nonEmpty) {
val clientMessage = clientMessageQueue.shift()
ws.send(write(clientMessage)(clientMsgRw))
// dom.console.log(":WS sent vvv")
// dom.console.log(clientMessage.asInstanceOf[js.Any])
}
}
}
private def createWebsocket(): WebSocket = {
val ws = new WebSocket(socketUrl)
dom.console.log(s"WS: Connecting to $socketUrl, ${ws.##}")
ws.onopen = (event: dom.Event) => {
dom.console.log(s"WS: Opened $socketUrl, ${ws.##}")
flushRequestQueue(ws)
}
ws.onmessage = (event: dom.MessageEvent) => {
serverMessageBus.writer.onNext(read(event.data.asInstanceOf[String])(serverMsgRw))
}
// Any `error` that happens will be followed by a `close` event according to the spec
ws.onerror = (event: dom.Event) => {
dom.console.warn("WS: Error:")
dom.console.error(event)
}
ws.onclose = (event: dom.CloseEvent) => {
dom.console.log(s"WS: Closed $socketUrl, ${ws.##}, ${event.code}, ${event.reason}")
if (event.code != WsClosedNormallyCode) {
// Websocket closed abnormally, report error
dom.console.error(event)
if (maybeWs.isDefined) {
dom.console.log("WS: Closed abnormally. Scheduled reopening...")
// We still need a websocket, so try opening it again
js.timers.setTimeout(5000) {
maybeWs = Some(createWebsocket())
}
}
}
}
ws
}
// @TODO[Integrity] We should probably use this some time...?
def kill(): Unit = {
maybeWs.foreach { ws =>
maybeWs = None
ws.close(WsClosedNormallyCode)
}
}
}
object WebsocketClient {
object ReadyState {
val Connecting = 0
val Open = 1
val Closing = 2
val Closed = 3
}
val WsClosedNormallyCode = 1000
// see https://github.com/Luka967/websocket-close-codes
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment