Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active June 25, 2023 15:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/1a442efca698d49f4d1c0a17168c9f4c to your computer and use it in GitHub Desktop.
Save dacr/1a442efca698d49f4d1c0a17168c9f4c to your computer and use it in GitHub Desktop.
playing with websockets using akka-http / published by https://github.com/dacr/code-examples-manager #50673e88-dcb8-4a43-8c40-46d82d30ae50/3588e65805b4fe702465e07bbf898924ed3b1138
// summary : playing with websockets using akka-http
// keywords : scala, actors, akka, http-client, client, json, json4s, websocket, helloworld
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 50673e88-dcb8-4a43-8c40-46d82d30ae50
// created-on : 2021-02-05T18:06:03Z
// managed-by : https://github.com/dacr/code-examples-manager
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc'
import $ivy.`com.typesafe.akka::akka-http:10.2.4`
import $ivy.`com.typesafe.akka::akka-stream-typed:2.6.13`
import akka.Done
import akka.http.scaladsl._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.scaladsl.{Keep, Sink, Source}
import scala.concurrent._
import scala.concurrent.duration._
object TestThat {
implicit val system = akka.actor.ActorSystem("MySystem")
implicit val executionContext = system.dispatcher
//val outgoing = Source.single(TextMessage("hello world!"))
val outgoing = Source.fromIterator( () =>
List(
"Hello world :)",
"What's your name ?",
"Shut up your face !"
).to(Iterator).map(s => TextMessage(s))
)
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict => println(message.text)
case _ =>
}
val uri = s"ws://echo.websocket.org"
val flow = Http().webSocketClientFlow(WebSocketRequest(uri = uri))
val (upgradedResponse, closed) =
outgoing
.viaMat(flow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
val connected = upgradedResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// Do not exit before the future has completed ;)
def andWait():Unit = Await.ready(closed, 10.seconds)
}
TestThat.andWait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment