Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
1409268672399 now
1 queued at 1409268672404
2 queued at 1409268672404
not fired
222 queued at 1409268672404
not fired
1 processed at 1409268672405
2 processed at 1409268674424
9999 queued at 1409268674603
not fired
Dump Vector(222, 9999)
222 processed at 1409268676443
Dump Vector(9999)
9999 processed at 1409268678463
import akka.actor.ActorSystem
import scala.concurrent.duration._
import akka.actor.ActorDSL._
// Better than ThrottledProcessor as it fires immediately if idle, and doesn't send Process messages repeatedly
object ThrottledProcessor2 extends App {
implicit val system = ActorSystem("ThrottledProcessor")
import system.dispatcher
case object Dump
case object Process
val delay=2.second
def now=System.currentTimeMillis
val a = actor("myactor")(new Act {
var numbers = IndexedSeq[Int]()
var lastTimestamp=0L
def updateTimestamp()=lastTimestamp=System.currentTimeMillis
def isIdle=System.currentTimeMillis-lastTimestamp>delay.toMillis
val scheduler=system.scheduler
become {
case Dump => println(s"Dump $numbers")
case Process => numbers =
numbers match {
case head +: tail => {updateTimestamp();println(s"$head processed at $now"); scheduler.scheduleOnce(delay,self,Process);tail}
case _ => numbers
}
case x: Int => println(s"$x queued at $now"); numbers = numbers :+ x; if (isIdle) { updateTimestamp(); self ! Process } else println (" not fired ")
}
})
println(s"$now now")
Seq(1, 2, 222).foreach(a!_)
Thread.sleep(1000*2+200)
a ! 9999
a ! Dump
Thread.sleep(3500)
a ! Dump
Thread.sleep(delay.toMillis)
system.shutdown
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment