Created
October 3, 2014 14:31
-
-
Save bhudgeons/dd574291d95e14ac1262 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.actor.Props | |
import akka.actor.Actor | |
import akka.actor.ActorRef | |
import akka.pattern.ask | |
import scala.concurrent.Future | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.duration._ | |
import akka.util.Timeout | |
case class Job[A](work:() => A) | |
object Throttler { | |
case object Finished | |
} | |
// implements very niave backpressure -- | |
// if there are over 30 outstanding jobs, | |
// returns None (client would be returned an error, | |
// perhaps with a retry suggestion). Between 12 and | |
// 30, adds an exponentially growing delay. | |
class Throttler extends Actor { | |
var outstandingJobs = 0 | |
def doJob[A](job:Job[A], mySender:ActorRef, ojs:Int) = Future { | |
if (ojs > 11) { | |
val delay = 2^(ojs * 100) | |
println(s"Throttling for $delay") | |
Thread.sleep(delay) | |
} | |
mySender ! Some(job.work()) | |
self ! Throttler.Finished | |
} | |
def receive = { | |
case job:Job[_] => { | |
if (outstandingJobs >= 30) sender ! None else { | |
outstandingJobs += 1 | |
doJob(job, sender, outstandingJobs) | |
} | |
} | |
case Throttler.Finished => outstandingJobs -= 1 | |
} | |
} | |
object Worker { | |
implicit val timeout = Timeout(60 seconds) | |
val system = ActorSystem("Worker") | |
val throttler = system.actorOf(Props[Throttler], "throttler") | |
def doJob[A](job:Job[A]):Future[Option[A]] = { | |
(throttler ? job).map(_.asInstanceOf[Option[A]]) | |
} | |
} | |
object BackpressureApp extends App { | |
// Sieve of Eratosthenes | |
def sieve(n: Int) = (2 to math.sqrt(n).toInt).foldLeft((2 to n).toSet) { (ps, x) => | |
if (ps(x)) ps -- (x * x to n by x) else ps | |
} | |
// counts # of primes under the given number | |
def sieve10000 = () => sieve(1000).size | |
def sieve300000 = () => sieve(300000).size | |
def sieve500000 = () => sieve(500000).size | |
def doWork[A](n:Int, work:() => A) = { | |
val f = Worker.doJob(Job(work)) | |
f.onSuccess { | |
case Some(x) => println(s"$n: Success! $x") | |
case _ => println(s"$n: please retry -- too many outstanding jobs") | |
} | |
f.onFailure { | |
case x => println(s"$n: Failure! $x") | |
} | |
} | |
println("performing 50 sieve10000, 1 every 100 ms: ") | |
(1 to 50).foreach(n => {Thread.sleep(100);doWork(n, sieve10000)}) | |
Thread.sleep(2000) | |
// on my MBP, this generates some throttling and sometimes a dropped job | |
println("performing 50 sieve300000, 1 every 100 ms: ") | |
(1 to 50).foreach(n => {Thread.sleep(100);doWork(n, sieve300000)}) | |
Thread.sleep(10000) // give some time for old jobs to clear out | |
// on my MBP, this results in a few dropped jobs & some throttling | |
println("performing 50 sieve500000, 1 every 100 ms: ") | |
(1 to 50).foreach(n => {Thread.sleep(100);doWork(n, sieve500000)}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment