Skip to content

Instantly share code, notes, and snippets.

@Deliganli
Last active April 25, 2019 21:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Deliganli/c7843d3470cb7d6226d29a989912cf55 to your computer and use it in GitHub Desktop.
Save Deliganli/c7843d3470cb7d6226d29a989912cf55 to your computer and use it in GitHub Desktop.
import cats.effect.{ConcurrentEffect, ContextShift, Timer}
import com.typesafe.scalalogging.StrictLogging
import fs2.concurrent.Queue
import fs2.{Pipe, Stream}
import io.lemonlabs.uri.AbsoluteUrl
import scodec.Codec
import spinoco.fs2.http.HttpClient
import spinoco.fs2.http.websocket.{Frame, WebSocketRequest}
class WSChannelFactory[F[_]: ConcurrentEffect: ContextShift: Timer](client: HttpClient[F]) {
implicit val codecString: Codec[String] = scodec.codecs.utf8
def connect(url: AbsoluteUrl): Stream[F, WSConnection[F]] = {
for {
incoming <- Stream.eval(Queue.unbounded[F, Frame[String]])
outgoing <- Stream.eval(Queue.unbounded[F, Frame[String]])
pipe = buildPipe(incoming, outgoing)
request = WebSocketRequest.wss(url.host.toString(), url.port.getOrElse(443), url.path.toString())
model = WSConnection(incoming.dequeue, outgoing.enqueue)
igniteStream = Stream.emit(model).covary[F]
headerStream = client.websocket(request, pipe)
connection <- igniteStream.concurrently(headerStream)
} yield connection
}
protected def buildPipe[I, O](
iq: Queue[F, Frame[I]],
oq: Queue[F, Frame[O]]
): Pipe[F, Frame[I], Frame[O]] = { inbound =>
val output = oq.dequeue
val input = inbound.through(iq.enqueue)
output.concurrently(input)
}
}
object WSChannelFactory {
case class WSConnection[F[_]](
incoming: Stream[F, Frame[String]],
outgoing: Pipe[F, Frame[String], Unit]
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment