Skip to content

Instantly share code, notes, and snippets.

@marioosh
Last active March 1, 2020 08:11
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save marioosh/0d583c74880f23a93f6c to your computer and use it in GitHub Desktop.
Save marioosh/0d583c74880f23a93f6c to your computer and use it in GitHub Desktop.
websocket based on akka-http
val actorAsSource = builder.materializedValue.map(actor => UserJoined(user, actor))
val chatActorSink = Sink.actorRef[ChatEvent](chatRoomActor, UserLeft(user))
class ChatRoom(roomId: Int, actorSystem: ActorSystem) {
private[this] val chatRoomActor = actorSystem.actorOf(Props(classOf[ChatRoomActor], roomId))
def websocketFlow(user: String): Flow[Message, Message, _] = ???
def sendMessage(message: ChatMessage): Unit = chatRoomActor ! message
}
object ChatRoom {
def apply(roomId: Int)(implicit actorSystem: ActorSystem) = new ChatRoom(roomId, actorSystem)
}
class ChatRoomActor(roomId: Int) extends Actor {
var participants: Map[String, ActorRef] = Map.empty[String, ActorRef]
override def receive: Receive = {
case UserJoined(name, actorRef) =>
participants += name -> actorRef
broadcast(SystemMessage(s"User $name joined channel..."))
println(s"User $name joined channel[$roomId]")
case UserLeft(name) =>
println(s"User $name left channel[$roomId]")
broadcast(SystemMessage(s"User $name left channel[$roomId]"))
participants -= name
case msg: IncomingMessage =>
broadcast(msg)
}
def broadcast(message: ChatMessage): Unit = participants.values.foreach(_ ! message)
}
object ChatRooms {
var chatRooms: Map[Int, ChatRoom] = Map.empty[Int, ChatRoom]
def findOrCreate(number: Int)(implicit actorSystem: ActorSystem): ChatRoom = chatRooms.getOrElse(number, createNewChatRoom(number))
private def createNewChatRoom(number: Int)(implicit actorSystem: ActorSystem): ChatRoom = {
val chatroom = ChatRoom(number)
chatRooms += number -> chatroom
chatroom
}
}
val echoService: Flow[Message, Message, _] = Flow[Message].map {
case TextMessage.Strict(txt) => TextMessage("ECHO: " + txt)
case _ => TextMessage("Message type unsupported")
}
val route = get {
pathEndOrSingleSlash {
complete("Welcome to websocket server")
}
} ~
path("ws-echo") {
get {
handleWebsocketMessages(echoService)
}
}
pathPrefix("ws-chat" / IntNumber) { chatId =>
parameter('name) { userName =>
handleWebsocketMessages(ChatRooms.findOrCreate(chatId).websocketFlow(userName))
}
object Server extends App {
implicit val actorSystem = ActorSystem("akka-system")
implicit val flowMaterializer = ActorMaterializer()
val interface = "localhost"
val port = 8080
import Directives._
val route = get {
pathEndOrSingleSlash {
complete("Welcome to websocket server")
}
}
val binding = Http().bindAndHandle(route, interface, port)
println(s"Server is now online at http://$interface:$port\nPress RETURN to stop...")
StdIn.readLine()
import actorSystem.dispatcher
binding.flatMap(_.unbind()).onComplete(_ => actorSystem.shutdown())
println("Server is down...")
}
def websocketFlow(user: String): Flow[Message, Message, _] =
//Factory method allows for materialization of this Source
Flow(Source.actorRef[ChatMessage](bufferSize = 5, OverflowStrategy.fail)) {
implicit builder =>
chatSource => //it's Source from parameter
//flow used as input, it takes Messages
val fromWebsocket = builder.add(
Flow[Message].collect {
case TextMessage.Strict(txt) => IncomingMessage(user, txt)
})
//flow used as output, it returns Messages
val backToWebsocket = builder.add(
Flow[ChatMessage].map {
case ChatMessage(author, text) => TextMessage(s"[$author]: $text")
}
)
//send messages to the actor, if sent also UserLeft(user) before stream completes.
val chatActorSink = Sink.actorRef[ChatEvent](chatRoomActor, UserLeft(user))
//merges both pipes
val merge = builder.add(Merge[ChatEvent](2))
//Materialized value of Actor who sits in the chatroom
val actorAsSource = builder.materializedValue.map(actor => UserJoined(user, actor))
//Message from websocket is converted into IncommingMessage and should be sent to everyone in the room
fromWebsocket ~> merge.in(0)
//If Source actor is just created, it should be sent as UserJoined and registered as particiant in the room
actorAsSource ~> merge.in(1)
//Merges both pipes above and forwards messages to chatroom represented by ChatRoomActor
merge ~> chatActorSink
//Actor already sits in chatRoom so each message from room is used as source and pushed back into the websocket
chatSource ~> backToWebsocket
// expose ports
(fromWebsocket.inlet, backToWebsocket.outlet)
}
class WSClient(url: String, name: String, actorSystem: ActorSystem) extends WebSocketClient(new URI(url), new Draft_17()) {
override def onMessage(message: String): Unit = println(message)
override def onError(ex: Exception): Unit = println("Websocket Error: " + ex.getMessage)
override def onClose(code: Int, reason: String, remote: Boolean): Unit = println("Websocket closed")
override def onOpen(handshakedata: ServerHandshake): Unit = println("Websocket opened for name=" + name)
def spam(message: String, numberOfTimes: Int = 1000) = {
val talkActor = actorSystem.actorOf(Props(new Actor {
import actorSystem.dispatcher
import scala.concurrent.duration._
var counter: Int = 0
override def receive: Receive = {
case message: String =>
counter = counter + 1
send(s"[$name] message #$counter")
if (counter < numberOfTimes)
actorSystem.scheduler.scheduleOnce(rand.seconds, self, message)
}
def rand: Int = 1 + Random.nextInt(9) //message every 1-10 seconds
}))
talkActor ! message
}
}
object WSClient {
def apply(url: String, name: String)(implicit actorSystem: ActorSystem): WSClient = {
new WSClient(url, name, actorSystem)
}
}
@Mohamed-boubaya
Copy link

Hello ,
Good work , but i have a question : if i want to save the list of msg and the list of room in db how i do it ??

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment