Skip to content

Instantly share code, notes, and snippets.

@adamw adamw/sttp2.scala
Last active Oct 15, 2019

What would you like to do?
package sttp.client.akkahttp
import akka.Done
import{ActorSystem, Cancellable}
import{Message, TextMessage}
import{Flow, Keep, Sink, Source}
import akka.util.ByteString
import sttp.client._
import scala.concurrent.duration._
import scala.concurrent.Future
object AkkaWebsocketExample extends App {
// setting up akka, the threadpool, etc.
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val backend: SttpBackend[Future, Source[ByteString, Any],
Flow[Message, Message, *]] =
import system.dispatcher
// creating a sink, which prints all incoming messages to the console
val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {
case m: TextMessage =>
m.toStrict(1.second).foreach(s => println(s"RECEIVED: $s"))
case _ =>
// creating a source, which produces a new text message each second
val source: Source[Message, Cancellable] = Source
.tick(1.second, 1.second, ())
.map(_ => TextMessage("Hello!"))
// combining the sink & source into a flow; the sink and source are
// disconnected and operate independently
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(sink, source)(Keep.left)
// using a test websocket endpoint
val response: Future[WebSocketResponse[Future[Done]]] =
// the "response" will be completed once the websocket is established
response.foreach { r =>
println("Websocket established!")
// The Future[Done] comes from the sink, and in this case will be completed
// once the server closes the connection.
r.result.foreach { _ =>
println("Websocket closed!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.