Skip to content

Instantly share code, notes, and snippets.

@adamw
Last active October 15, 2019 16:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adamw/215aacc452e49304f3c182d99ef49f9f to your computer and use it in GitHub Desktop.
Save adamw/215aacc452e49304f3c182d99ef49f9f to your computer and use it in GitHub Desktop.
package sttp.client.akkahttp
import akka.Done
import akka.actor.{ActorSystem, Cancellable}
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString
import sttp.client._
import sttp.client.ws.WebSocketResponse
import scala.concurrent.duration._
import scala.concurrent.Future
object AkkaWebsocketExample extends App {
// setting up akka, the threadpool, etc.
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val backend: SttpBackend[Future, Source[ByteString, Any],
Flow[Message, Message, *]] =
AkkaHttpBackend.usingActorSystem(system)
import system.dispatcher
// creating a sink, which prints all incoming messages to the console
val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {
case m: TextMessage =>
m.toStrict(1.second).foreach(s => println(s"RECEIVED: $s"))
case _ =>
}
// creating a source, which produces a new text message each second
val source: Source[Message, Cancellable] = Source
.tick(1.second, 1.second, ())
.map(_ => TextMessage("Hello!"))
// combining the sink & source into a flow; the sink and source are
// disconnected and operate independently
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(sink, source)(Keep.left)
// using a test websocket endpoint
val response: Future[WebSocketResponse[Future[Done]]] =
basicRequest.get(uri"wss://echo.websocket.org").openWebsocket(flow)
// the "response" will be completed once the websocket is established
response.foreach { r =>
println("Websocket established!")
// The Future[Done] comes from the sink, and in this case will be completed
// once the server closes the connection.
r.result.foreach { _ =>
println("Websocket closed!")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment