-
-
Save raquo/81f0cc2c67fc01afa5d681ad4a4c0457 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.raquo.ws | |
import com.raquo.airstream.core.Observer | |
import com.raquo.airstream.eventbus.EventBus | |
import com.raquo.airstream.eventstream.EventStream | |
import org.scalajs.dom | |
import org.scalajs.dom.WebSocket | |
import upickle.default._ | |
import scala.scalajs.js | |
// @TODO[Prod] We should test this for resilience to network failures / computer sleeps / inactivity / etc. | |
// @TODO[Security] Websockets need to be protected against CSRF (they are not covered by CORS) | |
class WebsocketClient[ClientMsg, ServerMsg]( | |
urlPath: String, | |
clientMsgRw: ReadWriter[ClientMsg], | |
serverMsgRw: ReadWriter[ServerMsg] | |
) { | |
import WebsocketClient._ | |
// @TODO[Prod] | |
private val socketUrl = s"ws://.../$urlPath" | |
private val serverMessageBus = new EventBus[ServerMsg] | |
private val clientMessageQueue = js.Array[ClientMsg]() | |
/** case None => Websocket connection NOT NEEDED right now, so there's no websocket. | |
* case Some(ws) => Websocket connection REQUIRED, and it is either already open or some | |
* other code is currently trying to (re)open it (don't interfere, trust | |
* the reconnection code to do its job) | |
*/ | |
private var maybeWs: Option[WebSocket] = Some(createWebsocket()) | |
/** Subscribe to this stream to receive websocket events from the server */ | |
val $message: EventStream[ServerMsg] = serverMessageBus.events.map { message => | |
// dom.console.log(":WS received vvv") | |
// dom.console.log(message.asInstanceOf[js.Any]) | |
message | |
} | |
/** Send requests to this observer to send them over the websocket */ | |
val writer: Observer[ClientMsg] = Observer { clientMsg => | |
clientMessageQueue.push(clientMsg) | |
maybeWs match { | |
case Some(ws) => | |
flushRequestQueue(ws) | |
case None => | |
maybeWs = Some(createWebsocket()) | |
} | |
} | |
def sendTrackedRequest[Response <: TrackedWebsocketResponse with ServerMsg]( | |
request: ClientMsg with TrackedWebsocketRequest[Response] | |
): EventStream[Response] = { | |
// @TODO[Performance] Unsubscribe after getting response | |
writer.onNext(request) | |
$message.collect { | |
case response: TrackedWebsocketResponse if response.sourceRequestId == request.requestId => | |
response.asInstanceOf[Response] | |
} | |
} | |
private def flushRequestQueue(ws: WebSocket): Unit = { | |
if (ws.readyState == ReadyState.Open) { | |
// println(s"> flushing query queue (${queryQueue.length} items), ws status: ${ws.readyState}") | |
while (clientMessageQueue.nonEmpty) { | |
val clientMessage = clientMessageQueue.shift() | |
ws.send(write(clientMessage)(clientMsgRw)) | |
// dom.console.log(":WS sent vvv") | |
// dom.console.log(clientMessage.asInstanceOf[js.Any]) | |
} | |
} | |
} | |
private def createWebsocket(): WebSocket = { | |
val ws = new WebSocket(socketUrl) | |
dom.console.log(s"WS: Connecting to $socketUrl, ${ws.##}") | |
ws.onopen = (event: dom.Event) => { | |
dom.console.log(s"WS: Opened $socketUrl, ${ws.##}") | |
flushRequestQueue(ws) | |
} | |
ws.onmessage = (event: dom.MessageEvent) => { | |
serverMessageBus.writer.onNext(read(event.data.asInstanceOf[String])(serverMsgRw)) | |
} | |
// Any `error` that happens will be followed by a `close` event according to the spec | |
ws.onerror = (event: dom.Event) => { | |
dom.console.warn("WS: Error:") | |
dom.console.error(event) | |
} | |
ws.onclose = (event: dom.CloseEvent) => { | |
dom.console.log(s"WS: Closed $socketUrl, ${ws.##}, ${event.code}, ${event.reason}") | |
if (event.code != WsClosedNormallyCode) { | |
// Websocket closed abnormally, report error | |
dom.console.error(event) | |
if (maybeWs.isDefined) { | |
dom.console.log("WS: Closed abnormally. Scheduled reopening...") | |
// We still need a websocket, so try opening it again | |
js.timers.setTimeout(5000) { | |
maybeWs = Some(createWebsocket()) | |
} | |
} | |
} | |
} | |
ws | |
} | |
// @TODO[Integrity] We should probably use this some time...? | |
def kill(): Unit = { | |
maybeWs.foreach { ws => | |
maybeWs = None | |
ws.close(WsClosedNormallyCode) | |
} | |
} | |
} | |
object WebsocketClient { | |
object ReadyState { | |
val Connecting = 0 | |
val Open = 1 | |
val Closing = 2 | |
val Closed = 3 | |
} | |
val WsClosedNormallyCode = 1000 | |
// see https://github.com/Luka967/websocket-close-codes | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment