Skip to content

Instantly share code, notes, and snippets.

@daggerrz
Created January 24, 2012 15:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daggerrz/1670599 to your computer and use it in GitHub Desktop.
Save daggerrz/1670599 to your computer and use it in GitHub Desktop.
Akka 1.3 circuit breaking dispatcher
package com.tapad.util
import akka.dispatch._
import java.util.concurrent.atomic.{AtomicInteger, AtomicBoolean}
class LogAndDiscardCircuitBreakerPolicy(val maxMailboxSize: Int, loggable: Logging) extends CircuitBreakerPolicy {
def onOverflowStart(mailboxSize: Int) { loggable.log.warn("Mailbox size is above {}. Ignoring messages until size is back below.", maxMailboxSize) }
def onBackToNormal(overflowCount: Int) { loggable.log.info("Mailbox size is back below {}. A total of {} messags were discarded.", maxMailboxSize, overflowCount) }
def replyToOverflow(overflowCount: Int, msg: Any) : Either[Exception, Any] = Left(new IllegalArgumentException("This overflow policy is not configured for request-reply actors."))
def shouldBeDiscarded(overflowCount: Int, msg: Any) : Boolean = true
}
/**
* Circuit breaker policy configuration trait.
*/
trait CircuitBreakerPolicy {
/**
* Max mailbox size before overflowing.
*/
def maxMailboxSize: Int
/**
* Called when an overflow situation starts.
*
* @param mailboxSize the current mailbox size
*/
def onOverflowStart(mailboxSize: Int)
/**
* Called for every message received while overflowing on a future
* channel. Allows for responding with a default message in overflow situations.
*
* @param overflowCount 1 for the first message and keeps increasing by 1 until the
* situation is resolved
* @param msg the current message
*
* @return the value to return to the sender / set on the future
*/
def replyToOverflow(overflowCount: Int, msg: Any) : Either[Exception, Any]
/**
* Called for every message received while overflowing on a null channel (no response).
*
* @return true if the message should be discarded
*/
def shouldBeDiscarded(overflowCount: Int, msg: Any) : Boolean
/**
* Called when the actor is not overflowing anymore.
*
* @param overflowCount the total number of messages overflown
*/
def onBackToNormal(overflowCount: Int)
}
/**
* An Akka message queue with unbounded semantics and circuit breaking. When the queue
* reaches the mailbox size, it will start popping the oldest elements
* off of the queue until it's back to size.
*/
trait CircuitBreakingDispatcherSemantics extends MessageDispatcher { self: ExecutorBasedEventDrivenDispatcher =>
/**
* The overflow policy to use.
*/
protected def policy : CircuitBreakerPolicy
private[this] val overflowing = new AtomicBoolean(false)
private[this] val overflowCount = new AtomicInteger(0)
abstract override def dispatch(invocation: MessageInvocation) {
val size = mailboxSize(invocation.receiver) + 1
if (size >= policy.maxMailboxSize) {
if (!overflowing.getAndSet(true)) {
policy.onOverflowStart(size)
}
invocation.channel match {
case f: ActorCompletableFuture =>
f.complete(policy.replyToOverflow(overflowCount.addAndGet(1), invocation.message))
case NullChannel =>
if (!policy.shouldBeDiscarded(overflowCount.addAndGet(1), invocation.message)) {
super.dispatch(invocation)
}
}
} else {
if (overflowing.getAndSet(false)) {
policy.onBackToNormal(overflowCount.getAndSet(0))
}
super.dispatch(invocation)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment