Skip to content

Instantly share code, notes, and snippets.

@busti
Last active September 23, 2019 15:02
Show Gist options
  • Save busti/322e7de4637c34294024821a53bc86b5 to your computer and use it in GitHub Desktop.
Save busti/322e7de4637c34294024821a53bc86b5 to your computer and use it in GitHub Desktop.
import cats.effect._
import monix.eval.TaskLike
import monix.execution.Ack.Continue
import monix.execution.{Ack, Cancelable}
import monix.reactive.OverflowStrategy.Unbounded
import monix.reactive._
import mycelium.client.raw.{ReconnectingWebSocket, ReconnectingWebsocketOptions}
import org.scalajs.dom._
import scala.concurrent.Future
import scala.scalajs.js.UndefOr
import scala.scalajs.js.typedarray._
object WebSocketTypes {
sealed trait WebSocketEvent
case class Open(event: Event) extends WebSocketEvent
case class Close(event: CloseEvent) extends WebSocketEvent
case class Message(event: MessageEvent) extends WebSocketEvent
case class Error(event: Event) extends WebSocketEvent
trait WebSocketData {
def sendData(webSocket: WebSocket)
}
implicit class dataString(data: String) extends WebSocketData {
def sendData(webSocket: WebSocket) = webSocket.send(data)
}
implicit class dataBlob(data: Blob) extends WebSocketData {
def sendData(webSocket: WebSocket) = webSocket.send(data)
}
implicit class dataArrayBuffer(data: ArrayBuffer) extends WebSocketData {
def sendData(webSocket: WebSocket) = webSocket.send(data)
}
}
class WebSocketConnection(webSocket: WebSocket) {
import WebSocketTypes._
def observable: Observable[WebSocketEvent] =
Observable.create(Unbounded) { observer =>
webSocket.onopen = (event: Event) => observer.onNext(Open(event))
webSocket.onclose = (event: CloseEvent) => observer.onNext(Close(event))
webSocket.onmessage = (event: MessageEvent) => observer.onNext(Message(event))
webSocket.onerror = (event: Event) => observer.onNext(Error(event))
Cancelable.empty
}
def observer[F[_]: Sync]: F[Observer[WebSocketData]] =
Sync[F].delay {
new Observer[WebSocketData] {
override def onNext(data: WebSocketData): Future[Ack] = {
data.sendData(webSocket)
Continue
}
override def onError(ex: Throwable) = throw ex
override def onComplete() = ()
}
}
}
object WebSocketConnection {
def openReconnectingWebSocket[F[_]: Sync](
url: String
): Resource[F, ReconnectingWebSocket] =
Resource.make {
Sync[F].delay(new ReconnectingWebSocket(url))
} { in =>
Sync[F].delay(in.close())
}
def openWebSocket[F[_]: Sync](
url: String
): Resource[F, WebSocket] =
Resource.make {
Sync[F].delay(new WebSocket(url))
} { in =>
Sync[F].delay(in.close())
}
def apply[F[_]: Sync](
resource: Resource[F, WebSocket]
): Resource[F, WebSocketConnection] = resource.evalMap { webSocket =>
Sync[F].delay(new WebSocketConnection(webSocket))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment