Skip to content

Instantly share code, notes, and snippets.

@fancellu
Created August 28, 2014 23:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fancellu/d215d71e2ae630b8f98c to your computer and use it in GitHub Desktop.
Save fancellu/d215d71e2ae630b8f98c to your computer and use it in GitHub Desktop.
1409268672399 now
1 queued at 1409268672404
2 queued at 1409268672404
not fired
222 queued at 1409268672404
not fired
1 processed at 1409268672405
2 processed at 1409268674424
9999 queued at 1409268674603
not fired
Dump Vector(222, 9999)
222 processed at 1409268676443
Dump Vector(9999)
9999 processed at 1409268678463
import akka.actor.ActorSystem
import scala.concurrent.duration._
import akka.actor.ActorDSL._
// Better than ThrottledProcessor as it fires immediately if idle, and doesn't send Process messages repeatedly
object ThrottledProcessor2 extends App {
implicit val system = ActorSystem("ThrottledProcessor")
import system.dispatcher
case object Dump
case object Process
val delay=2.second
def now=System.currentTimeMillis
val a = actor("myactor")(new Act {
var numbers = IndexedSeq[Int]()
var lastTimestamp=0L
def updateTimestamp()=lastTimestamp=System.currentTimeMillis
def isIdle=System.currentTimeMillis-lastTimestamp>delay.toMillis
val scheduler=system.scheduler
become {
case Dump => println(s"Dump $numbers")
case Process => numbers =
numbers match {
case head +: tail => {updateTimestamp();println(s"$head processed at $now"); scheduler.scheduleOnce(delay,self,Process);tail}
case _ => numbers
}
case x: Int => println(s"$x queued at $now"); numbers = numbers :+ x; if (isIdle) { updateTimestamp(); self ! Process } else println (" not fired ")
}
})
println(s"$now now")
Seq(1, 2, 222).foreach(a!_)
Thread.sleep(1000*2+200)
a ! 9999
a ! Dump
Thread.sleep(3500)
a ! Dump
Thread.sleep(delay.toMillis)
system.shutdown
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment