Last active
October 13, 2020 16:37
-
-
Save EdgeCaseBerg/dc449ea0028a7c83cb71b1a5af39e9ea to your computer and use it in GitHub Desktop.
Basic Rate Limiting Actor implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import org.scalatest._ | |
import org.scalatest.concurrent._ | |
import org.scalatest.time.{ Millis, Seconds, Span } | |
import akka.actor.ActorSystem | |
import akka.util.Timeout | |
import java.util.concurrent.TimeUnit.MILLISECONDS | |
/** Test Suite providing a helper to grab an actor system configured as "myTestSystem" that will be cleaned up after use | |
*/ | |
trait ActorTest extends FlatSpec with Matchers with ScalaFutures { | |
/** How long to wait for futures and how often to check on them */ | |
implicit val defaultPatience = PatienceConfig(timeout = Span(5, Seconds), interval = Span(500, Millis)) | |
/** How long an actor is given to talk to another actor with the ask pattern before it times out */ | |
implicit val timeoutForTalkingToActors = Timeout(1000, MILLISECONDS) | |
/** Get an auto-closing actor system and run code with it | |
* @param testCode The Code block to execution given an ActorSystem | |
*/ | |
def withActorSystem[T](testCode: ActorSystem => T) = { | |
val system = ActorSystem("myTestSystem") | |
try { | |
testCode(system) | |
} finally { | |
system.terminate() | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scalaVersion := "2.11.7" | |
libraryDependencies ++= Seq( | |
"org.scalatest" %% "scalatest" % "3.0.0" % "test", | |
"com.typesafe.akka" %% "akka-cluster" % "2.4.16" | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import akka.routing.ConsistentHashingRouter.ConsistentHashable | |
import akka.actor.ActorRef | |
import akka.util.Timeout | |
package object auth { | |
/** Base class for messagings in this package */ | |
abstract class AuthMessage | |
/** Sent when a user cannot make any more requests because it is rate limited */ | |
case object RateLimited extends AuthMessage | |
/** Sent when a user wants to take an action that must first pass through a rate limiter */ | |
case class RateLimitedAction(to: ActorRef, action: Any, waitUpTo: Timeout) extends AuthMessage with ConsistentHashable { | |
def consistentHashKey = to | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors.auth | |
import akka.actor.{ Actor, ActorRef } | |
import akka.pattern.{ ask, PipeToSupport } | |
import scala.concurrent.duration.{ FiniteDuration, MINUTES, Deadline } | |
import scala.concurrent.ExecutionContext.Implicits.global | |
/** Actor to throttle access to another actor. | |
* @param actionsAllowedPerDuration Number of actions allowed per duration | |
* @param duration The duration in which actions are forgetten about, thus allowing further actions to occur | |
*/ | |
class RateLimitingActor(private val actionsAllowedPerDuration: Long, private val duration: FiniteDuration = FiniteDuration(1, MINUTES)) extends Actor with PipeToSupport { | |
/* Keep track of actors to be throttled to */ | |
private var numActionsByKey = Map[ActorRef, Seq[Deadline]]() | |
/* Either limit the action and tell the sender so, or pass along the message and then deliver the reply */ | |
def receive = { | |
case RateLimitedAction(to, msg, waitUpTo) => | |
val actionsOfClient = numActionsByKey.get(to).fold(Seq.empty[Deadline])(identity) :+ (Deadline.now + duration) | |
val now = Deadline.now | |
val actionsInLastMinute = actionsOfClient.filter { deadline => | |
deadline.hasTimeLeft() | |
} | |
numActionsByKey = numActionsByKey + (to -> actionsInLastMinute) | |
if (actionsInLastMinute.size > actionsAllowedPerDuration) { | |
sender ! RateLimited | |
} else { | |
implicit val timeout = waitUpTo | |
val fut = (to ? msg) | |
fut pipeTo sender | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors.auth | |
import actors.ActorTest | |
import akka.actor.{ Actor, ActorRef, Props } | |
import akka.pattern.ask | |
import scala.concurrent.duration.{ FiniteDuration, MILLISECONDS } | |
class RateLimitingActorTest extends ActorTest { | |
private val waitTime = FiniteDuration(250, MILLISECONDS) | |
case object HowMany | |
case object GotMsg | |
class Receiver extends Actor { | |
private var messagesRecieved: Long = _ | |
def receive = { | |
case HowMany => | |
messagesRecieved = messagesRecieved + 1 | |
sender ! messagesRecieved | |
case _ => | |
messagesRecieved = messagesRecieved + 1 | |
sender ! GotMsg | |
} | |
} | |
def withRateLimiter[T](actionsAllowedPer: Long)(testCode: (ActorRef, ActorRef) => T) = { | |
withActorSystem { system => | |
val receiver = system.actorOf(Props(new Receiver), "receiver") | |
val rateLimiter = system.actorOf(Props(new RateLimitingActor(actionsAllowedPer, waitTime)), "limiter") | |
testCode(rateLimiter, receiver) | |
} | |
} | |
"The RateLimitingActor" should "allow access when limit has not been reached" in { | |
withRateLimiter(1) { | |
case (rateLimiter, receiver) => | |
val future = rateLimiter ? RateLimitedAction(receiver, HowMany, timeoutForTalkingToActors) | |
whenReady(future) { msg => | |
msg match { | |
case num: Long => info(s"RateLimiter allowed access. $num msgs received") | |
case msg => fail(s"Should have allowed access! Instead got message: $msg") | |
} | |
} | |
} | |
} | |
it should "reject access when limit has been reached" in { | |
withRateLimiter(1) { | |
case (rateLimiter, receiver) => | |
rateLimiter ? RateLimitedAction(receiver, HowMany, timeoutForTalkingToActors) | |
val future = rateLimiter ? RateLimitedAction(receiver, HowMany, timeoutForTalkingToActors) | |
whenReady(future) { msg => | |
msg match { | |
case authMsg: AuthMessage => assertResult(RateLimited)(authMsg) | |
case msg => fail(s"Should have allowed access! Instead got message: $msg") | |
} | |
} | |
} | |
} | |
it should "allow messages again once the rate limit has expired" in { | |
withRateLimiter(1) { | |
case (rateLimiter, receiver) => | |
rateLimiter ? RateLimitedAction(receiver, HowMany, timeoutForTalkingToActors) | |
Thread.sleep(waitTime.toMillis * 2) | |
val future = rateLimiter ? RateLimitedAction(receiver, HowMany, timeoutForTalkingToActors) | |
whenReady(future) { msg => | |
msg match { | |
case num: Long => info(s"RateLimiter allowed access. $num msgs received") | |
case msg => fail(s"Should have allowed access! Instead got message: $msg") | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment