Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active July 2, 2023 17:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/01f6e352dc6fcf845fe805c545d3f24e to your computer and use it in GitHub Desktop.
Save dacr/01f6e352dc6fcf845fe805c545d3f24e to your computer and use it in GitHub Desktop.
Send stdin lines to websocket and dump (asynchronously) everything coming from the websocket to stdout stream implementation / published by https://github.com/dacr/code-examples-manager #d70bb96b-d6f1-49f8-a115-2c1ace17a7f4/7047deecd47dbb6b4fa908ea2183b1aeabb62015
// summary : Send stdin lines to websocket and dump (asynchronously) everything coming from the websocket to stdout stream implementation
// keywords : scala, actors, akka, http-client, client, websocket, cat, streams
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : d70bb96b-d6f1-49f8-a115-2c1ace17a7f4
// created-on : 2021-02-08T07:05:39Z
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : scala-cli $file
// usage-example : scala-cli akka-wscat-stream.sc -- ws://127.0.0.1:8080
// ---------------------
//> using scala "2.13.11"
//> using dep "com.typesafe.akka::akka-http:10.2.10"
//> using dep "com.typesafe.akka::akka-stream-typed:2.6.21"
//> using dep "com.typesafe.akka::akka-actor-typed:2.6.21"
////> using dep "org.slf4j:slf4j-nop:2.0.7"
//> using dep "org.slf4j:slf4j-simple:2.0.7"
// ---------------------
import akka.Done
import akka.util.ByteString
import akka.stream.scaladsl._
import akka.stream._
import akka.http.scaladsl._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws.{TextMessage, WebSocketRequest}
import akka.http.scaladsl.model._
import scala.concurrent._
import scala.concurrent.duration._
def wsMessageToString(implicit materializer: Materializer, executionContext: ExecutionContext): ws.Message => Future[String] = {
case message: ws.TextMessage.Strict =>
Future.successful(message.text)
case message: ws.TextMessage.Streamed =>
val seq = message.textStream.runWith(Sink.seq)
seq.map(seq => seq.mkString)
case message =>
Future.successful(message.toString)
}
val onlineWebSocketEchoServices = List(
"wss://demo.piesocket.com",
//"wss://socketsbay.com/wss/v2/1/demo/", // no echo response :(
//"ws://echo.websocket.org" // decommissioned on 2021 !
)
implicit val system = akka.actor.ActorSystem("MySystem")
implicit val executionContext = system.dispatcher
val uri = args.headOption.getOrElse(onlineWebSocketEchoServices.head)
val stdinSource: Source[ByteString, Future[IOResult]] = StreamConverters.fromInputStream(() => System.in)
//val stdoutSink: Sink[ByteString, Future[IOResult]] = StreamConverters.fromOutputStream(() => System.out)
val stdoutSink: Sink[String, Future[Done]] =
Sink.foreach[String] { s =>
System.out.println(s)
System.out.flush()
}
println(s"Request websocket from $uri")
val flow = Http().webSocketClientFlow(request = WebSocketRequest(uri = uri))
val (upgradedResponse, closed) =
stdinSource
.map { bt => TextMessage(bt.decodeString("UTF-8")) }
.viaMat(flow)(Keep.right)
.mapAsync(1)(wsMessageToString)
.toMat(stdoutSink)(Keep.both)
.run()
val connected = upgradedResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) Future.successful(Done)
else throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment