Skip to content

Instantly share code, notes, and snippets.

@EdgeCaseBerg
Last active October 13, 2020 16:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save EdgeCaseBerg/dc449ea0028a7c83cb71b1a5af39e9ea to your computer and use it in GitHub Desktop.
Save EdgeCaseBerg/dc449ea0028a7c83cb71b1a5af39e9ea to your computer and use it in GitHub Desktop.
Basic Rate Limiting Actor implementation
package actors
import org.scalatest._
import org.scalatest.concurrent._
import org.scalatest.time.{ Millis, Seconds, Span }
import akka.actor.ActorSystem
import akka.util.Timeout
import java.util.concurrent.TimeUnit.MILLISECONDS
/** Test Suite providing a helper to grab an actor system configured as "myTestSystem" that will be cleaned up after use
*/
trait ActorTest extends FlatSpec with Matchers with ScalaFutures {
/** How long to wait for futures and how often to check on them */
implicit val defaultPatience = PatienceConfig(timeout = Span(5, Seconds), interval = Span(500, Millis))
/** How long an actor is given to talk to another actor with the ask pattern before it times out */
implicit val timeoutForTalkingToActors = Timeout(1000, MILLISECONDS)
/** Get an auto-closing actor system and run code with it
* @param testCode The Code block to execution given an ActorSystem
*/
def withActorSystem[T](testCode: ActorSystem => T) = {
val system = ActorSystem("myTestSystem")
try {
testCode(system)
} finally {
system.terminate()
}
}
}
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.0" % "test",
"com.typesafe.akka" %% "akka-cluster" % "2.4.16"
)
package actors
import akka.routing.ConsistentHashingRouter.ConsistentHashable
import akka.actor.ActorRef
import akka.util.Timeout
package object auth {
/** Base class for messagings in this package */
abstract class AuthMessage
/** Sent when a user cannot make any more requests because it is rate limited */
case object RateLimited extends AuthMessage
/** Sent when a user wants to take an action that must first pass through a rate limiter */
case class RateLimitedAction(to: ActorRef, action: Any, waitUpTo: Timeout) extends AuthMessage with ConsistentHashable {
def consistentHashKey = to
}
}
package actors.auth
import akka.actor.{ Actor, ActorRef }
import akka.pattern.{ ask, PipeToSupport }
import scala.concurrent.duration.{ FiniteDuration, MINUTES, Deadline }
import scala.concurrent.ExecutionContext.Implicits.global
/** Actor to throttle access to another actor.
* @param actionsAllowedPerDuration Number of actions allowed per duration
* @param duration The duration in which actions are forgetten about, thus allowing further actions to occur
*/
class RateLimitingActor(private val actionsAllowedPerDuration: Long, private val duration: FiniteDuration = FiniteDuration(1, MINUTES)) extends Actor with PipeToSupport {
/* Keep track of actors to be throttled to */
private var numActionsByKey = Map[ActorRef, Seq[Deadline]]()
/* Either limit the action and tell the sender so, or pass along the message and then deliver the reply */
def receive = {
case RateLimitedAction(to, msg, waitUpTo) =>
val actionsOfClient = numActionsByKey.get(to).fold(Seq.empty[Deadline])(identity) :+ (Deadline.now + duration)
val now = Deadline.now
val actionsInLastMinute = actionsOfClient.filter { deadline =>
deadline.hasTimeLeft()
}
numActionsByKey = numActionsByKey + (to -> actionsInLastMinute)
if (actionsInLastMinute.size > actionsAllowedPerDuration) {
sender ! RateLimited
} else {
implicit val timeout = waitUpTo
val fut = (to ? msg)
fut pipeTo sender
}
}
}
package actors.auth
import actors.ActorTest
import akka.actor.{ Actor, ActorRef, Props }
import akka.pattern.ask
import scala.concurrent.duration.{ FiniteDuration, MILLISECONDS }
class RateLimitingActorTest extends ActorTest {
private val waitTime = FiniteDuration(250, MILLISECONDS)
case object HowMany
case object GotMsg
class Receiver extends Actor {
private var messagesRecieved: Long = _
def receive = {
case HowMany =>
messagesRecieved = messagesRecieved + 1
sender ! messagesRecieved
case _ =>
messagesRecieved = messagesRecieved + 1
sender ! GotMsg
}
}
def withRateLimiter[T](actionsAllowedPer: Long)(testCode: (ActorRef, ActorRef) => T) = {
withActorSystem { system =>
val receiver = system.actorOf(Props(new Receiver), "receiver")
val rateLimiter = system.actorOf(Props(new RateLimitingActor(actionsAllowedPer, waitTime)), "limiter")
testCode(rateLimiter, receiver)
}
}
"The RateLimitingActor" should "allow access when limit has not been reached" in {
withRateLimiter(1) {
case (rateLimiter, receiver) =>
val future = rateLimiter ? RateLimitedAction(receiver, HowMany, timeoutForTalkingToActors)
whenReady(future) { msg =>
msg match {
case num: Long => info(s"RateLimiter allowed access. $num msgs received")
case msg => fail(s"Should have allowed access! Instead got message: $msg")
}
}
}
}
it should "reject access when limit has been reached" in {
withRateLimiter(1) {
case (rateLimiter, receiver) =>
rateLimiter ? RateLimitedAction(receiver, HowMany, timeoutForTalkingToActors)
val future = rateLimiter ? RateLimitedAction(receiver, HowMany, timeoutForTalkingToActors)
whenReady(future) { msg =>
msg match {
case authMsg: AuthMessage => assertResult(RateLimited)(authMsg)
case msg => fail(s"Should have allowed access! Instead got message: $msg")
}
}
}
}
it should "allow messages again once the rate limit has expired" in {
withRateLimiter(1) {
case (rateLimiter, receiver) =>
rateLimiter ? RateLimitedAction(receiver, HowMany, timeoutForTalkingToActors)
Thread.sleep(waitTime.toMillis * 2)
val future = rateLimiter ? RateLimitedAction(receiver, HowMany, timeoutForTalkingToActors)
whenReady(future) { msg =>
msg match {
case num: Long => info(s"RateLimiter allowed access. $num msgs received")
case msg => fail(s"Should have allowed access! Instead got message: $msg")
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment