Skip to content

Instantly share code, notes, and snippets.

@adamw
Last active September 16, 2020 16:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adamw/407e4e42e760b2a2df66e40ff8732aee to your computer and use it in GitHub Desktop.
Save adamw/407e4e42e760b2a2df66e40ff8732aee to your computer and use it in GitHub Desktop.
package sttp.client3.examples
import cats.effect.{ContextShift, IO}
import fs2._
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3._
import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend
import sttp.ws.WebSocketFrame
object WebSocketStreamFs2 extends App {
implicit val cs: ContextShift[IO] = IO.contextShift(
scala.concurrent.ExecutionContext.global)
def webSocketFramePipe: Pipe[IO, WebSocketFrame.Data[_], WebSocketFrame] = { input =>
Stream.emit(WebSocketFrame.text("1")) ++ input.flatMap {
case WebSocketFrame.Text("10", _, _) =>
println("Received 10 messages, sending close frame")
Stream.emit(WebSocketFrame.close)
case WebSocketFrame.Text(n, _, _) =>
println(s"Received $n messages, replying with $n+1")
Stream.emit(WebSocketFrame.text((n.toInt + 1).toString))
case _ => Stream.empty // ignoring
}
}
AsyncHttpClientFs2Backend
.resource[IO]()
.use { backend =>
basicRequest
.response(asWebSocketStream(Fs2Streams[IO])(webSocketFramePipe))
.get(uri"wss://echo.websocket.org")
.send(backend)
.void
}
.unsafeRunSync()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment