Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active May 25, 2024 08:39
Show Gist options
  • Save dacr/d0596f1cbde704b5b6bca0ce190d046c to your computer and use it in GitHub Desktop.
Save dacr/d0596f1cbde704b5b6bca0ce190d046c to your computer and use it in GitHub Desktop.
pekko websocket echo service / published by https://github.com/dacr/code-examples-manager #9d8e8a4d-1e24-4f06-8bf5-1a283dd6607b/4fca470a16b893f7fa6d4f125f1b07d58cb8efcf
// summary : pekko websocket echo service
// keywords : scala, actors, pekko-http, http-server, websocket
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 9d8e8a4d-1e24-4f06-8bf5-1a283dd6607b
// created-on : 2023-07-02T18:50:39+02:00
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : scala-cli $file
// usage-example : scala-cli pekko-http-server-websocket-echo.sc
// ---------------------
//> using scala "3.4.2"
//> using objectWrapper
//> using repository "https://repository.apache.org/content/groups/snapshots"
//> using dep "org.apache.pekko::pekko-http:0.0.0+4455-91b6086b-SNAPSHOT"
//> using dep "org.apache.pekko::pekko-stream:1.0.0-RC3+7-029806f8-SNAPSHOT"
//> using dep "org.slf4j:slf4j-simple:2.0.7"
// ---------------------
import org.apache.pekko.http.scaladsl._
import org.apache.pekko.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.stream.scaladsl.{Flow, Sink}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
// ---------------------------------------------------------------------------------------------------------------------
// Just a helper function which can be removed as only used for some println
def lanAddresses(): List[String] = {
import scala.jdk.CollectionConverters._
java.net.NetworkInterface
.getNetworkInterfaces().asScala
.filterNot(_.isLoopback)
.filterNot(_.isVirtual)
.filter(_.isUp)
.toList
.flatMap { interface =>
val ips = interface
.getInetAddresses.asScala
.to(List)
.filterNot(_.isAnyLocalAddress)
.collect { case x: java.net.Inet4Address => x.getHostAddress }
ips.headOption
}
}
// ---------------------------------------------------------------------------------------------------------------------
val port = args.headOption.map(_.toInt).getOrElse(8080)
val interface = args.drop(1).headOption.getOrElse("0.0.0.0")
System.setProperty("pekko.http.server.remote-address-header", "true")
System.setProperty("pekko.http.server.remote-address-attribute", "true")
given system:org.apache.pekko.actor.ActorSystem = org.apache.pekko.actor.ActorSystem("MySystem")
given executionContext:scala.concurrent.ExecutionContextExecutor = system.dispatcher
val routes = pathEndOrSingleSlash {
extractClientIP { clientIP =>
val echoFlow = {
Flow[Message].mapConcat {
case tm: TextMessage =>
System.out.print(clientIP.toString())
System.out.print(">")
System.out.print(tm.getStrictText)
System.out.flush()
TextMessage(tm.textStream) :: Nil
case bm: BinaryMessage =>
bm.dataStream.runWith(Sink.ignore) // Force consume
Nil
}
}
val from = clientIP.toIP.map(_.ip.getHostAddress)
println(s"new connection from $from")
handleWebSocketMessages(echoFlow)
}
}
Http().newServerAt(interface, port).bind(routes).andThen { case _ =>
val addr = lanAddresses().head
println(s"Waiting for websocket clients on $interface:$port ")
println(s"Try this server by using such command :")
println(s"- scala-cli akka-wscat.sc -- ws://$addr:8080")
println(s"- scala-cli pekko-wscat.sc -- ws://$addr:8080")
println(s"- scala-cli akka-wscat-stream.sc -- ws://$addr:8080")
println(s"- scala-cli pekko-wscat-stream.sc -- ws://$addr:8080")
println(s"- docker run -it --rm solsson/websocat -v ws://$addr:8080")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment