Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active June 25, 2023 15:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/f777f85e78b3c14c0433477a997ae000 to your computer and use it in GitHub Desktop.
Save dacr/f777f85e78b3c14c0433477a997ae000 to your computer and use it in GitHub Desktop.
Dump incoming websockets content / published by https://github.com/dacr/code-examples-manager #01916f4b-1529-46c3-952b-58cd56d34f09/ee88cc22bde70faf0587eaf06e16c7a8c2a1628b
// summary : Dump incoming websockets content
// keywords : scala, actors, akka, http-client, client, json, json4s, websocket
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 01916f4b-1529-46c3-952b-58cd56d34f09
// created-on : 2021-02-06T13:13:52Z
// managed-by : https://github.com/dacr/code-examples-manager
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc'
import $ivy.`com.typesafe.akka::akka-http:10.2.4`
import $ivy.`com.typesafe.akka::akka-actor-typed:2.6.13`
import $ivy.`com.typesafe.akka::akka-stream-typed:2.6.13`
import $ivy.`org.slf4j:slf4j-nop:1.7.30` // to avoid missing logging impl warning at startup
import akka.Done
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws.{TextMessage, WebSocketRequest}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Keep
import akka.stream.typed.scaladsl.{ActorSink, ActorSource}
import scala.concurrent._
import scala.concurrent.duration._
object WebSockEcho {
// -----------------------------------------------------------------
sealed trait Get
case class GetReceived(content:String) extends Get
case class GetFailure(ex:Throwable) extends Get
object GetFinished extends Get
def behaviorGet():Behavior[Get] = Behaviors.receiveMessage {
case GetReceived(content) =>
println(content)
Behaviors.same
case GetFinished =>
Behaviors.stopped
}
// -----------------------------------------------------------------
sealed trait Put
case class PutRegister(controlActorRef:ActorRef[Control]) extends Put
case class PutTransmit(content:String) extends Put
object PutFinished extends Put
object PutShutdown extends Put
def behaviorPut():Behavior[Put] = Behaviors.receiveMessage {
case PutRegister(controlActorRef) =>
Behaviors.receiveMessage {
case PutTransmit(content) =>
controlActorRef ! ControlContent(content)
Behaviors.same
case PutShutdown => // Delayed finished as this behavior is the main one used on actor system start
Behaviors.setup{context =>
context.scheduleOnce(1.second, context.self, PutFinished)
Behaviors.same
}
case PutFinished =>
Behaviors.stopped // Means shutdown the actor system here !
}
}
// -----------------------------------------------------------------
sealed trait Control
object ControlDone extends Control
case class ControlContent(content:String) extends Control
case class ControlFailure(ex:Exception) extends Control
}
// ================================================================================================
object Main {
import WebSockEcho._
implicit val system = akka.actor.typed.ActorSystem[Put](behaviorPut(), "MySystem")
implicit val executionContext = system.executionContext
// -----------------------------------------------------------------
// typed actor based source, using the guardianBehavior as our put behavior to send data to the websocket
val (controlActorRef, outgoing) =
ActorSource.actorRef[Control](
completionMatcher = {case ControlDone => },
failureMatcher = {case ControlFailure(ex) => ex},
bufferSize=20,
OverflowStrategy.fail
).preMaterialize()
system ! PutRegister(controlActorRef)
// -----------------------------------------------------------------
// typed actor based sink, will receive data from the remote websocket
val incoming =
ActorSink.actorRef[Get](
ref = system.systemActorOf(behaviorGet(), "truc"),
onCompleteMessage = GetFinished,
onFailureMessage = ex => GetFailure(ex)
)
// -----------------------------------------------------------------
// websocket connect
val uri = s"ws://echo.websocket.org"
val remoteProcessingFlow = Http().webSocketClientFlow(WebSocketRequest(uri = uri))
// -----------------------------------------------------------------
// Build the graph outgoing ~> remoteProcessingFlow ~> incoming
val (upgradedResponse, closed) =
outgoing
.map{case ControlContent(content) =>TextMessage.Strict(content) }
.viaMat(remoteProcessingFlow)(Keep.right)
.map{case content => GetReceived(content.asTextMessage.getStrictText)}
.toMat(incoming)(Keep.both)
.run()
// -----------------------------------------------------------------
val connected = upgradedResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// -----------------------------------------------------------------
system ! PutTransmit("Hello world")
system ! PutTransmit("Cool Raoul")
system ! PutShutdown // We can not just finish immediately else we will exit before getting anything
// Do not exit before the future has completed ;)
def andWait():Unit = Await.result(system.whenTerminated, Duration.Inf)
}
Main.andWait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment