Last active
August 29, 2015 14:22
-
-
Save weirded/b0152b503c1f9d02bf24 to your computer and use it in GitHub Desktop.
Base for sleeper cell style load generation.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sumologic.util.scala.bench | |
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} | |
import com.netflix.config.scala.DynamicProperties | |
import com.sumologic.util.scala.env.Environment | |
import com.sumologic.util.scala.log.Logging | |
import com.sumologic.util.scala.rateLimiter.FixedRateLimiter | |
import com.sumologic.util.scala.time.{TimeConstants, TimeFormats, TimeSource} | |
import scala.util.control.NonFatal | |
abstract class SleeperCell(name: String, | |
assemblyName: String) | |
extends DynamicProperties | |
with Logging | |
with TimeSource | |
with TimeConstants { | |
// API to implement by subclasses. | |
protected def makeRequest(): Unit | |
protected def logStats(): Unit | |
protected def resetStats(): Unit | |
// Remote control. | |
private val configUpdateCallback = new Runnable() { | |
override def run(): Unit = checkForConfigurationUpdate() | |
} | |
private val activatedAssemblies = dynamicStringListProperty(s"sleeper.cell.$name.assemblies", List[String]()) | |
activatedAssemblies.addCallback(configUpdateCallback) | |
protected val requestsPerSecond = dynamicIntProperty(s"sleeper.cell.$name.rate", Int.MaxValue) | |
requestsPerSecond.addCallback(configUpdateCallback) | |
protected val agentThreads = dynamicIntProperty(s"sleeper.cell.$name.agents", 64) | |
agentThreads.addCallback(configUpdateCallback) | |
// Stats. | |
protected val lastLog = new AtomicLong(now) | |
protected val requestCount = new AtomicInteger(0) | |
protected val failedRequestCount = new AtomicInteger(0) | |
// State. | |
private var activeAgents: Seq[SleeperAgent] = Seq.empty[SleeperAgent] | |
checkForConfigurationUpdate() | |
prefix(s"$name sleeper cell") | |
info("Initialized and awaiting instructions.") | |
private def checkForConfigurationUpdate() { | |
this synchronized { | |
val cellActivated = !Environment().isProd && activatedAssemblies.get().contains(assemblyName) | |
if (cellActivated && activeAgents.isEmpty) { | |
activateCell() | |
} else if (!cellActivated && activeAgents.nonEmpty) { | |
goToSleep() | |
} else if (cellActivated && activeAgents.size != agentThreads.get()) { | |
info(s"Agent count changed from ${activeAgents.size} to ${agentThreads.get()} - restarting.") | |
goToSleep() | |
activateCell() | |
} | |
} | |
} | |
private def activateCell() { | |
info(s"We have been activated. Activating ${agentThreads.get()} agents.") | |
activeAgents = (1 to agentThreads.get()).map(new SleeperAgent(_)) | |
activeAgents.foreach(_.start()) | |
} | |
private def goToSleep() { | |
info(s"We have been told to go back to sleep. Shutting down ${activeAgents.size} agents.") | |
activeAgents.foreach(_.keepRunning = false) | |
activeAgents.foreach(_.join()) | |
requestCount.set(0) | |
failedRequestCount.set(0) | |
resetStats() | |
} | |
private class SleeperAgent(id: Int) | |
extends Thread(s"Sleeper-Agent-$name-$id") | |
with TimeConstants | |
with TimeFormats { | |
var keepRunning = true | |
val rateLimiter = new FixedRateLimiter(requestsPerSecond.get(), 1.second) | |
override def run() { | |
while (keepRunning) { | |
while (!rateLimiter.isActionAllowed) { | |
Thread.sleep(50) | |
} | |
try { | |
rateLimiter.recordAction() | |
requestCount.incrementAndGet() | |
makeRequest() | |
} catch { | |
case NonFatal(e) => failedRequestCount.incrementAndGet() | |
} | |
def timeToLogStats: Boolean = (now - lastLog.get()) > 15.seconds | |
if (timeToLogStats) { | |
lastLog synchronized { | |
if (timeToLogStats) { | |
logStats() | |
lastLog.set(now) | |
} | |
} | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment