Skip to content

Instantly share code, notes, and snippets.

@dacr
Created July 2, 2023 17:48
Show Gist options
  • Save dacr/6746750db550f326f251ab87c40b35d7 to your computer and use it in GitHub Desktop.
Save dacr/6746750db550f326f251ab87c40b35d7 to your computer and use it in GitHub Desktop.
Send stdin lines to websocket and dump (asynchronously) everything coming from the websocket to stdout stream implementation using pekko-stream / published by https://github.com/dacr/code-examples-manager #691295f4-9dd1-480c-a050-120d9c613936/e73768c6f8bfecf9b5cb44f0f1a382357572eabb
// summary : Send stdin lines to websocket and dump (asynchronously) everything coming from the websocket to stdout stream implementation using pekko-stream
// keywords : scala, actors, pekko, http-client, client, websocket, cat, streams
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 691295f4-9dd1-480c-a050-120d9c613936
// created-on : 2023-07-02T19:05:12+02:00
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : scala-cli $file
// usage-example : scala-cli pekko-wscat-stream.sc -- ws://127.0.0.1:8080
// ---------------------
//> using scala "3.3.0"
//> using repository "https://repository.apache.org/content/groups/snapshots"
//> using dep "org.apache.pekko::pekko-http:0.0.0+4455-91b6086b-SNAPSHOT"
//> using dep "org.apache.pekko::pekko-stream-typed:1.0.0-RC3+7-029806f8-SNAPSHOT"
//> using dep "org.apache.pekko::pekko-actor-typed:1.0.0-RC3+7-029806f8-SNAPSHOT"
//> using dep "org.slf4j:slf4j-simple:2.0.7"
// ---------------------
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.Done
import org.apache.pekko.util.ByteString
import org.apache.pekko.stream.scaladsl.*
import org.apache.pekko.stream.*
import org.apache.pekko.http.scaladsl.*
import org.apache.pekko.http.scaladsl.model.StatusCodes
import org.apache.pekko.http.scaladsl.model.ws.{TextMessage, WebSocketRequest}
import org.apache.pekko.http.scaladsl.model.*
import scala.concurrent.*
import scala.concurrent.duration.*
def wsMessageToString(implicit materializer: Materializer, executionContext: ExecutionContext): ws.Message => Future[String] = {
case message: ws.TextMessage.Strict =>
Future.successful(message.text)
case message: ws.TextMessage.Streamed =>
val seq = message.textStream.runWith(Sink.seq)
seq.map(seq => seq.mkString)
case message =>
Future.successful(message.toString)
}
val onlineWebSocketEchoServices = List(
"wss://demo.piesocket.com",
//"wss://socketsbay.com/wss/v2/1/demo/", // no echo response :(
//"ws://echo.websocket.org" // decommissioned on 2021 !
)
given system: ActorSystem = ActorSystem("MySystem")
given executor:ExecutionContextExecutor = system.dispatcher
val uri = args.headOption.getOrElse(onlineWebSocketEchoServices.head)
val stdinSource: Source[ByteString, Future[IOResult]] = StreamConverters.fromInputStream(() => System.in)
//val stdoutSink: Sink[ByteString, Future[IOResult]] = StreamConverters.fromOutputStream(() => System.out)
val stdoutSink: Sink[String, Future[Done]] =
Sink.foreach[String] { s =>
System.out.println(s)
System.out.flush()
}
println(s"Request websocket from $uri")
val flow = Http().webSocketClientFlow(request = WebSocketRequest(uri = uri))
val (upgradedResponse, closed) =
stdinSource
.map { bt => TextMessage(bt.decodeString("UTF-8")) }
.viaMat(flow)(Keep.right)
.mapAsync(1)(wsMessageToString)
.toMat(stdoutSink)(Keep.both)
.run()
val connected = upgradedResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) Future.successful(Done)
else throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment