Skip to content

Instantly share code, notes, and snippets.

@xmeng1
Created June 22, 2019 06:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xmeng1/c99da1fd6cdeac226c80f658d2144aa3 to your computer and use it in GitHub Desktop.
Save xmeng1/c99da1fd6cdeac226c80f658d2144aa3 to your computer and use it in GitHub Desktop.
Akka stream and Akka Http WebSocket Example 1
object ChatApp extends App {
implicit val system = ActorSystem("chat")
implicit val executor: ExecutionContextExecutor = system.dispatcher
implicit val materializer = ActorMaterializer()
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
val maximumClients = 1
class ChatRef extends Actor {
override def receive: Receive = withClients(Map.empty[UUID, ActorRef])
def withClients(clients: Map[UUID, ActorRef]): Receive = {
case SignedMessage(uuid, msg) => clients.collect{
case (id, ar) if id != uuid => ar ! msg
}
case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
case CloseConnection(uuid) => context.become(withClients(clients - uuid))
}
}
object Protocol {
case class SignedMessage(uuid: UUID, msg: String)
case class OpenConnection(actor: ActorRef, uuid: UUID)
case class CloseConnection(uuid: UUID)
}
val chatRef = system.actorOf(Props[ChatRef])
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.mapAsync(1) {
case TextMessage.Strict(s) => Future.successful(s)
case TextMessage.Streamed(s) => s.runFold("")(_ + _)
case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
}.via(chatActorFlow(UUID.randomUUID()))
.map(TextMessage(_))
def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {
val sink = Flow[String]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
val source = Source.actorRef(16, OverflowStrategy.fail)
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}
Flow.fromSinkAndSource(sink, source)
}
Http().bindAndHandle(route, "0.0.0.0", 8080)
.map(_ => println(s"Started server..."))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment