Skip to content

Instantly share code, notes, and snippets.

@Timshel
Last active January 24, 2019 20:30
Show Gist options
  • Save Timshel/1d9ace8b3fd441c4c4940d734ec76286 to your computer and use it in GitHub Desktop.
Save Timshel/1d9ace8b3fd441c4c4940d734ec76286 to your computer and use it in GitHub Desktop.
Implementation of a Throttle with a sliding window (with a probably useless `nice` delay)
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.stage._
import akka.stream._
import scala.concurrent.duration.{ FiniteDuration, _ }
/**
* Implementation of a Throttle with a sliding window
*/
class SlidingThrottle[T]
(max: Int, per: FiniteDuration)
(nice: FiniteDuration = SlidingThrottle.nice(max, per)) extends SimpleLinearGraphStage[T] {
require(max > 0, "max must be > 0")
require(per.toNanos > 0, "per time must be > 0")
require(per.toNanos >= max, "Rates larger than 1 unit / nanosecond are not supported")
private val nanosPer = per.toNanos
private val nanosNice = nice.toNanos
private val timerName: String = "ThrottleTimer"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
var willStop = false
var emittedTimes = scala.collection.immutable.Queue.empty[Long]
var last: Long = System.nanoTime
var currentElement: T = _
def pushThenLog(elem: T): Unit = {
push(out, elem)
last = System.nanoTime
emittedTimes = emittedTimes :+ last
if( willStop ) completeStage()
}
def schedule(elem: T, nanos: Long): Unit = {
currentElement = elem
scheduleOnce(timerName, nanos.nanos)
}
def receive(elem: T): Unit = {
var now = System.nanoTime
emittedTimes = emittedTimes.dropWhile { t => t + nanosPer < now }
if( emittedTimes.length < max ) {
val delay = last + nanosNice - now
if( delay >= 0 ) schedule(elem, delay) else pushThenLog(elem)
} else {
schedule(elem, emittedTimes.head + nanosPer - System.nanoTime)
}
}
// This scope is here just to not retain an extra reference to the handler below.
// We can't put this code into preRestart() because setHandler() must be called before that.
{
val handler = new InHandler with OutHandler {
override def onUpstreamFinish(): Unit =
if (isAvailable(out) && isTimerActive(timerName)) willStop = true
else completeStage()
override def onPush(): Unit = receive(grab(in))
override def onPull(): Unit = pull(in)
}
setHandler(in, handler)
setHandler(out, handler)
// After this point, we no longer need the `handler` so it can just fall out of scope.
}
override protected def onTimer(key: Any): Unit = {
var elem = currentElement
currentElement = null.asInstanceOf[T]
receive(elem)
}
}
override def toString = "Throttle"
}
object SlidingThrottle{
def nice(max: Int, per: FiniteDuration): FiniteDuration =
((per.toNanos / max) * 0.1).toLong.nanos
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment