Skip to content

Instantly share code, notes, and snippets.

@busti
Created June 8, 2020 15:44
Show Gist options
  • Save busti/42a2573fea31b4e44751ecd127bbcb96 to your computer and use it in GitHub Desktop.
Save busti/42a2573fea31b4e44751ecd127bbcb96 to your computer and use it in GitHub Desktop.
import cats.implicits._
import cats.effect._
import io.chrisdavenport.log4cats.Logger
import monix.eval.TaskLike
import monix.execution.Ack.Continue
import monix.execution.{Ack, Cancelable}
import monix.reactive.OverflowStrategy.Unbounded
import monix.reactive._
import mycelium.client.raw.{ReconnectingWebSocket, ReconnectingWebsocketOptions}
import org.scalajs.dom._
import scala.concurrent.Future
import scala.scalajs.js.UndefOr
import scala.scalajs.js.typedarray._
import scala.scalajs.js
import js.JSConverters._
object WebSocketTypes {
sealed trait WebSocketEvent
case class Open(event: Event) extends WebSocketEvent
case class Close(event: CloseEvent) extends WebSocketEvent
case class Message(event: MessageEvent) extends WebSocketEvent
case class Error(event: Event) extends WebSocketEvent
trait WebSocketData {
def sendData(webSocket: WebSocket)
}
implicit class dataString(data: String) extends WebSocketData {
def sendData(webSocket: WebSocket) = webSocket.send(data)
}
implicit class dataBlob(data: Blob) extends WebSocketData {
def sendData(webSocket: WebSocket) = webSocket.send(data)
}
implicit class dataArrayBuffer(data: ArrayBuffer) extends WebSocketData {
def sendData(webSocket: WebSocket) = webSocket.send(data)
}
}
class WebSocketConnection(webSocket: WebSocket) {
import WebSocketTypes._
def observable: Observable[WebSocketEvent] =
Observable.create(Unbounded) { observer =>
webSocket.onopen = (event: Event) => observer.onNext(Open(event))
webSocket.onclose = (event: CloseEvent) => observer.onNext(Close(event))
webSocket.onmessage = (event: MessageEvent) => observer.onNext(Message(event))
webSocket.onerror = (event: Event) => observer.onNext(Error(event))
Cancelable.empty
}
def observer[F[_]: Sync]: F[Observer[WebSocketData]] =
Sync[F].delay {
new Observer[WebSocketData] {
override def onNext(data: WebSocketData): Future[Ack] = {
data.sendData(webSocket)
Continue
}
override def onError(ex: Throwable): Unit = throw ex
override def onComplete(): Unit = ???
}
}
}
object WebSocketConnection {
def openReconnectingWebSocket[F[_]: Sync: Logger](
url: String
): Resource[F, ReconnectingWebSocket] =
Resource.make {
Logger[F].info(s"opening reconnecting websocket to $url") *>
Sync[F].delay(new ReconnectingWebSocket(url))
} { in =>
Logger[F].info(s"closing reconnecting websocket to $url") *>
Sync[F].delay(in.close())
}
def openWebSocket[F[_]: Sync: Logger](
url: String
): Resource[F, WebSocket] =
Resource.make {
Logger[F].info(s"opening websocket to $url") *>
Sync[F].delay(new WebSocket(url))
} { in =>
Logger[F].info(s"opening websocket to $url") *>
Sync[F].delay(in.close())
}
def apply[F[_]: Sync](
resource: Resource[F, WebSocket]
): Resource[F, WebSocketConnection] =
resource.evalMap { webSocket =>
Sync[F].delay(new WebSocketConnection(webSocket))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment