Skip to content

Instantly share code, notes, and snippets.

@bhudgeons
Created October 3, 2014 14:31
Show Gist options
  • Save bhudgeons/dd574291d95e14ac1262 to your computer and use it in GitHub Desktop.
Save bhudgeons/dd574291d95e14ac1262 to your computer and use it in GitHub Desktop.
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor
import akka.actor.ActorRef
import akka.pattern.ask
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import akka.util.Timeout
case class Job[A](work:() => A)
object Throttler {
case object Finished
}
// implements very niave backpressure --
// if there are over 30 outstanding jobs,
// returns None (client would be returned an error,
// perhaps with a retry suggestion). Between 12 and
// 30, adds an exponentially growing delay.
class Throttler extends Actor {
var outstandingJobs = 0
def doJob[A](job:Job[A], mySender:ActorRef, ojs:Int) = Future {
if (ojs > 11) {
val delay = 2^(ojs * 100)
println(s"Throttling for $delay")
Thread.sleep(delay)
}
mySender ! Some(job.work())
self ! Throttler.Finished
}
def receive = {
case job:Job[_] => {
if (outstandingJobs >= 30) sender ! None else {
outstandingJobs += 1
doJob(job, sender, outstandingJobs)
}
}
case Throttler.Finished => outstandingJobs -= 1
}
}
object Worker {
implicit val timeout = Timeout(60 seconds)
val system = ActorSystem("Worker")
val throttler = system.actorOf(Props[Throttler], "throttler")
def doJob[A](job:Job[A]):Future[Option[A]] = {
(throttler ? job).map(_.asInstanceOf[Option[A]])
}
}
object BackpressureApp extends App {
// Sieve of Eratosthenes
def sieve(n: Int) = (2 to math.sqrt(n).toInt).foldLeft((2 to n).toSet) { (ps, x) =>
if (ps(x)) ps -- (x * x to n by x) else ps
}
// counts # of primes under the given number
def sieve10000 = () => sieve(1000).size
def sieve300000 = () => sieve(300000).size
def sieve500000 = () => sieve(500000).size
def doWork[A](n:Int, work:() => A) = {
val f = Worker.doJob(Job(work))
f.onSuccess {
case Some(x) => println(s"$n: Success! $x")
case _ => println(s"$n: please retry -- too many outstanding jobs")
}
f.onFailure {
case x => println(s"$n: Failure! $x")
}
}
println("performing 50 sieve10000, 1 every 100 ms: ")
(1 to 50).foreach(n => {Thread.sleep(100);doWork(n, sieve10000)})
Thread.sleep(2000)
// on my MBP, this generates some throttling and sometimes a dropped job
println("performing 50 sieve300000, 1 every 100 ms: ")
(1 to 50).foreach(n => {Thread.sleep(100);doWork(n, sieve300000)})
Thread.sleep(10000) // give some time for old jobs to clear out
// on my MBP, this results in a few dropped jobs & some throttling
println("performing 50 sieve500000, 1 every 100 ms: ")
(1 to 50).foreach(n => {Thread.sleep(100);doWork(n, sieve500000)})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment