Skip to content

Instantly share code, notes, and snippets.

@jeantil
Last active May 4, 2016 13:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeantil/424e4ebd268998280696c4f2b16b28e5 to your computer and use it in GitHub Desktop.
Save jeantil/424e4ebd268998280696c4f2b16b28e5 to your computer and use it in GitHub Desktop.
// tu as 2 flux différents a gérer :
// ton protocole interne
def receiveChat:Receive = {
case chatMessage@ChatMessage(message, topic, client) if isActive =>
// dans tous les cas j'ajoute le message a la fin de la file d'attente
// si tu ne fais pas ça tu envoies potentiellement les chat messages dans le désordre
// aka race condition :)
buffer.enqueue(chatMessage)
// dans tous les cas j'essaye d'envoyer totalDemand messages depuis la file d'attente
deliverPending()
}
// le protocole akka-stream
def handleStreamProtocol :Receive{
case ActorPublisherMessage.Request(_) =>
deliverPending()
case ActorPublisherMessage.Cancel =>
context.stop(self)
}
// j'ai viré le cas où totalDemande est > Int.MaxValue mais tu peux le récuperer si besoin
final def deliverPending(): Unit =
if (totalDemand > 0) {
val (use, keep) = buffer.splitAt(totalDemand.toInt) // prendre les totalDemand premiers elements et essayer de les envoyer
buffer = keep // le reste on garde
use foreach onNext
}
// et ton receive devient SRP ;)
override def receive: Receive =
receiveChat orElse handleStreamProtocol
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment