Skip to content

Instantly share code, notes, and snippets.

@shankarshastri
Created December 19, 2020 11:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shankarshastri/e230dd07b82d2d717dce3b652bf807b2 to your computer and use it in GitHub Desktop.
Save shankarshastri/e230dd07b82d2d717dce3b652bf807b2 to your computer and use it in GitHub Desktop.
AkkaTypedWebsocketServer
package com.akka.websocket
import akka.NotUsed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import akka.http.scaladsl.server.Directives._
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.typed.scaladsl.{ActorSink, ActorSource}
import akka.stream.{Materializer, OverflowStrategy}
import com.akka.websocket.BroadCastBehaviour.broadCastActorBehaviour
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout
import com.akka.websocket.Model.{Broadcast, BroadcastToAll, MessageWrapperTrait, RequestMessageWrapper, ResponseMessageWrapper, UserAdded, WebSocketMsg}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContextExecutor, Future}
object AkkaTypedWebsocketServer extends App {
implicit val system: ActorSystem[NotUsed] = ActorSystem(initActorSystemBehaviour, "http-system")
def sinkActor: Behavior[MessageWrapperTrait] = {
Behaviors.receive {
(context, msg) => {
msg match {
case Model.RequestMessageWrapper(message, replyTo) =>
message match {
case message: TextMessage =>
replyTo ! ResponseMessageWrapper(TextMessage(Source.single(s"Hello ${ message.getStrictText }") ++ Source.single("!")))
case message: BinaryMessage =>
message.dataStream.runWith(Sink.ignore)
}
Behaviors.same
}
}
}
}
def webSocketConnections(l: List[ActorRef[Message]] = List.empty): Behavior[WebSocketMsg] = {
Behaviors.receive[WebSocketMsg] {
(context, message) => {
message match {
case Model.UserAdded(actorRef) => webSocketConnections(actorRef :: l)
case Model.BroadcastToAll(msg) =>
context.spawnAnonymous(broadCastActorBehaviour) ! Broadcast(l, msg)
Behaviors.same
}
}
}
}
def initActorSystemBehaviour: Behavior[NotUsed] = {
Behaviors.setup[NotUsed](implicit e => {
implicit val sys: ActorSystem[Nothing] = e.system
implicit val mat: Materializer = Materializer(e.system)
implicit val ec: ExecutionContextExecutor = e.executionContext
val webSocketActorRef = e.spawnAnonymous(webSocketConnections())
val sinkActorRef = e.spawnAnonymous(sinkActor)
def chat(implicit mat: Materializer): Flow[Message, Message, Any] = {
val (a, s) = ActorSource.actorRef[Message](completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty, 20, OverflowStrategy.dropBuffer)
.toMat(Sink.asPublisher(false))(Keep.both).run()(mat)
webSocketActorRef ! UserAdded(a)
// https://stackoverflow.com/questions/41316173/akka-websocket-how-to-close-connection-by-server
val sink = Flow[Message].map {
case TextMessage.Strict(msg) =>
// Incoming message from ws
TextMessage(Source.single(s"Hello ${ msg }") ++ Source.single("!")) :: Nil
}.to(Sink.seq)
Flow.fromSinkAndSource(sink, Source.fromPublisher(s))
}
def chatWithActors(implicit mat: Materializer): Flow[Message, Message, Any] = {
val (a, s) = ActorSource.actorRef[Message](completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty, 20, OverflowStrategy.dropBuffer)
.toMat(Sink.asPublisher(false))(Keep.both).run()(mat)
webSocketActorRef ! UserAdded(a)
implicit val timeout: Timeout = 3.seconds
val sinkFromActor = Flow[Message].mapAsync(10)(msg => {
sinkActorRef.ask[ResponseMessageWrapper](e => RequestMessageWrapper(msg, e))
}).map(e => e.responseMessage).to(Sink.seq)
// https://stackoverflow.com/questions/41316173/akka-websocket-how-to-close-connection-by-server
Flow.fromSinkAndSource(sinkFromActor, Source.fromPublisher(s))
}
val route =
path("greeter") {
handleWebSocketMessages(chatWithActors)
} ~ path("broadcast") {
webSocketActorRef ! BroadcastToAll(TextMessage("Hey Everyone"))
complete(StatusCodes.Accepted)
}
val bindingFuture = Http().newServerAt("localhost", 8080).bind(route)
println(s"Server online at http://localhost:8080/\n")
scala.io.StdIn.readLine()
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
Behaviors.same
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment