Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
class EventPublisherActor extends ActorPublisher[GameEvent] {
var eventCache: List[GameEvent] = Nil
context.system.eventStream.subscribe(self, classOf[GameEvent])
override def receive = {
case Request(n) =>
while (isActive && totalDemand > 0 && eventCache.nonEmpty) {
val (head :: tail) = eventCache
onNext(head)
eventCache = tail
}
case event: GameEvent =>
if (isActive && totalDemand > 0)
onNext(event)
else
eventCache :+= event
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment