Skip to content

Instantly share code, notes, and snippets.

@derekjw
Created January 17, 2011 22:49
Show Gist options
  • Save derekjw/783646 to your computer and use it in GitHub Desktop.
Save derekjw/783646 to your computer and use it in GitHub Desktop.
diff of Akka's Dispatchers.scala using case classes for config
diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala
index 56dbc11..8b32235 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala
@@ -7,10 +7,9 @@ package akka.dispatch
import akka.actor.{Actor, ActorRef}
import akka.actor.newUuid
import akka.config.Config._
+import akka.config.DefaultDispatcherConfig
import akka.util.{Duration, Logging}
-import net.lag.configgy.ConfigMap
-
import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy}
import java.util.concurrent.TimeUnit
@@ -47,19 +46,15 @@ import java.util.concurrent.TimeUnit
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Dispatchers extends Logging {
- val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
- val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout").
- map(time => Duration(time, TIME_UNIT)).
- getOrElse(Duration(1000,TimeUnit.MILLISECONDS))
- val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
- val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT)
- val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time",-1), TIME_UNIT)
+ val THROUGHPUT = config.actor.throughput
+ val DEFAULT_SHUTDOWN_TIMEOUT = Duration(config.actor.dispatcherShutdownTimeout, TIME_UNIT)
+ val MAILBOX_CAPACITY = config.actor.defaultDispatcher.mailboxCapacity
+ val MAILBOX_PUSH_TIME_OUT = Duration(config.actor.defaultDispatcher.mailboxPushTimeout, TIME_UNIT)
+ val THROUGHPUT_DEADLINE_TIME = Duration(config.actor.throughputDeadlineTime, TIME_UNIT)
val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt
val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 0) UnboundedMailbox() else BoundedMailbox()
- lazy val defaultGlobalDispatcher = {
- config.getConfigMap("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher)
- }
+ lazy val defaultGlobalDispatcher = from(config.actor.defaultDispatcher)
object globalHawtDispatcher extends HawtDispatcher
@@ -147,8 +142,8 @@ object Dispatchers extends Logging {
* Utility function that tries to load the specified dispatcher config from the akka.conf
* or else use the supplied default dispatcher
*/
- def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher =
- config getConfigMap key flatMap from getOrElse default
+ //def fromConfig(key: String, default: => MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher =
+ //config getConfigMap key flatMap from getOrElse default
/*
* Creates of obtains a dispatcher from a ConfigMap according to the format below
@@ -172,48 +167,48 @@ object Dispatchers extends Logging {
* Returns: None if "type" isn't specified in the config
* Throws: IllegalArgumentException if the value of "type" is not valid
*/
- def from(cfg: ConfigMap): Option[MessageDispatcher] = {
- lazy val name = cfg.getString("name", newUuid.toString)
+ def from(cfg: DefaultDispatcherConfig): MessageDispatcher = {
+ lazy val name = newUuid.toString // is this used in akka.conf? cfg.getString("name", newUuid.toString)
def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
import ThreadPoolConfigDispatcherBuilder.conf_?
//Apply the following options to the config if they are present in the cfg
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure(
- conf_?(cfg getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
- conf_?(cfg getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
- conf_?(cfg getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)),
- conf_?(cfg getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)),
- conf_?(cfg getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)),
- conf_?(cfg getString "rejection-policy" map {
+ _.setKeepAliveTime(Duration(cfg.keepAliveTime, TIME_UNIT)),
+ _.setCorePoolSizeFromFactor(cfg.corePoolSizeFactor),
+ _.setMaxPoolSizeFromFactor(cfg.maxPoolSizeFactor),
+ _.setExecutorBounds(cfg.executorBounds),
+ _.setAllowCoreThreadTimeout(cfg.allowCoreTimeout),
+ _.setRejectionPolicy(cfg.rejectionPolicy match {
case "abort" => new AbortPolicy()
case "caller-runs" => new CallerRunsPolicy()
case "discard-oldest" => new DiscardOldestPolicy()
case "discard" => new DiscardPolicy()
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
- })(policy => _.setRejectionPolicy(policy)))
+ }))
}
lazy val mailboxType: MailboxType = {
- val capacity = cfg.getInt("mailbox-capacity", MAILBOX_CAPACITY)
+ val capacity = cfg.mailboxCapacity
// FIXME how do we read in isBlocking for mailbox? Now set to 'false'.
if (capacity < 0) UnboundedMailbox()
- else BoundedMailbox(false, capacity, Duration(cfg.getInt("mailbox-push-timeout", MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT))
+ else BoundedMailbox(false, capacity, Duration(cfg.mailboxPushTimeout, TIME_UNIT)) //bug?
}
- cfg.getString("type") map {
+ cfg.dispatcher match {
case "ExecutorBasedEventDriven" =>
configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenDispatcher(
name,
- cfg.getInt("throughput", THROUGHPUT),
- cfg.getInt("throughput-deadline", THROUGHPUT_DEADLINE_TIME_MILLIS),
+ cfg.throughput,
+ cfg.throughputDeadlineTime, //bug?
mailboxType,
threadPoolConfig)).build
case "ExecutorBasedEventDrivenWorkStealing" =>
configureThreadPool(poolCfg => new ExecutorBasedEventDrivenWorkStealingDispatcher(name, mailboxType,poolCfg)).build
- case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate",true))
+ case "Hawt" => new HawtDispatcher(cfg.aggregate)
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment