Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active July 2, 2023 17:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/382c52248d714cef572ac5f0acbd9ec2 to your computer and use it in GitHub Desktop.
Save dacr/382c52248d714cef572ac5f0acbd9ec2 to your computer and use it in GitHub Desktop.
Send stdin lines to websocket and dump (asynchronously) everything coming from the websocket to stdout / published by https://github.com/dacr/code-examples-manager #57415e71-093a-4470-8c09-e80426ddd438/f25bebeda8fad5cdd2137e00d264b1751d604f92
// summary : Send stdin lines to websocket and dump (asynchronously) everything coming from the websocket to stdout
// keywords : scala, actors, akka, http-client, client, websocket, cat
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 57415e71-093a-4470-8c09-e80426ddd438
// created-on : 2021-02-05T18:06:03Z
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : scala-cli $file
// usage-example : scala-cli akka-wscat.sc -- ws://127.0.0.1:8080
// ---------------------
//> using scala "2.13.11"
//> using objectWrapper
//> using dep "com.typesafe.akka::akka-http:10.2.10"
//> using dep "com.typesafe.akka::akka-stream-typed:2.6.21"
//> using dep "com.typesafe.akka::akka-actor-typed:2.6.21"
////> using dep "org.slf4j:slf4j-nop:2.0.7"
//> using dep "org.slf4j:slf4j-simple:2.0.7"
// ---------------------
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.Done
import akka.stream.scaladsl._
import akka.stream._
import akka.stream.typed.scaladsl.ActorSource
import akka.http.scaladsl._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage, WebSocketRequest}
import scala.annotation.tailrec
import scala.concurrent._
import scala.concurrent.duration._
import scala.io.StdIn
object Protocol {
// -------------------------------------------------------------------------------------------
// Actor behavior to manage input data
sealed trait Send
case class SendRegister(controlActorRef: ActorRef[Control]) extends Send
case class SendTransmit(content: String) extends Send
object SendFinished extends Send
def behaviorSend(): Behavior[Send] = Behaviors.receiveMessage {
case SendRegister(controlActorRef) =>
Behaviors.receiveMessage {
case SendTransmit(content) =>
controlActorRef ! ControlContent(content)
Behaviors.same
case SendFinished =>
controlActorRef ! ControlDone
Behaviors.stopped // Means shutdown the actor system here !
}
}
// -------------------------------------------------------------------------------------------
// To talk with the created Source actor
sealed trait Control
object ControlDone extends Control
case class ControlContent(content:String) extends Control
case class ControlFailure(ex:Exception) extends Control
}
val onlineWebSocketEchoServices = List(
"wss://demo.piesocket.com",
//"wss://socketsbay.com/wss/v2/1/demo/", // no echo response :(
//"ws://echo.websocket.org" // decommissioned on 2021 !
)
import Protocol._
implicit val system = akka.actor.typed.ActorSystem[Send](behaviorSend(), "MySystem")
implicit val executionContext = system.executionContext
val uri = args.headOption.getOrElse(onlineWebSocketEchoServices.head)
// bufferSize & overflowStrategy are important to deal with fast input rate
// With stdIn no backpressure is possible I guess
val (controlActorRef, outgoing) =
ActorSource.actorRef[Control](
completionMatcher = {case ControlDone => },
failureMatcher = {case ControlFailure(ex) => ex},
bufferSize=10000,
overflowStrategy = OverflowStrategy.dropNew
).preMaterialize()
system ! SendRegister(controlActorRef)
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case TextMessage.Strict(text) =>
println(text)
case TextMessage.Streamed(stream) =>
val text = stream.runReduce(_ + _) // Force consume and concat all responses fragments
text.map(System.out.println).andThen(_ => System.out.flush) // TODO better handling of this future ?
case BinaryMessage.Strict(bin) =>
println("Strict binary message not supported ")
case BinaryMessage.Streamed(bin) =>
println("Streamed binary message not supported ")
bin.runWith(Sink.ignore) // Force consume (to free input stream)
case x =>
println(s"Not understood entry $x")
}
println(s"Request websocket from $uri")
val flow = Http().webSocketClientFlow(request = WebSocketRequest(uri = uri))
val (upgradedResponse, closed) =
outgoing
.map{case ControlContent(content) => TextMessage(content) }
.viaMat(flow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
val connected = upgradedResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
@tailrec
def loop(): Unit = {
val input = Option(StdIn.readLine()) // returns null for EOF so NONE in our case
input match {
case Some(content) =>
system ! SendTransmit(content+"\n")
loop()
case None => system ! SendFinished
}
}
new Thread() {
override def run(): Unit = {
loop()
}
}.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment