Skip to content

Instantly share code, notes, and snippets.

@jadlr
Created February 29, 2016 17:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jadlr/5f5566c4a57d751b7402 to your computer and use it in GitHub Desktop.
Save jadlr/5f5566c4a57d751b7402 to your computer and use it in GitHub Desktop.
Reusable rate limiter for akka streams
import akka.stream.FlowShape
import akka.stream.scaladsl.{Flow, GraphDSL, Source, Zip}
import scala.concurrent.duration.FiniteDuration
object RateLimiter {
private def rateLimiter[A](rate: FiniteDuration) = {
case object Tick
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val ticker = b.add(Source.tick(rate, rate, Tick))
val zip = b.add(Zip[A, Tick.type]())
val unpack = b.add(Flow[(A, Tick.type)].map(_._1))
ticker ~> zip.in1; zip.out ~> unpack
FlowShape(zip.in0, unpack.out)
})
}
}
@jadlr
Copy link
Author

jadlr commented Feb 29, 2016

Would that be an appropriate implementation of a fixed rate limiter? I need this to limit http requests being made to a 3rd party site. I just plug this flow into the graph right before the connection pool.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment