Skip to content

Instantly share code, notes, and snippets.

@weirded
Last active August 29, 2015 14:22
Show Gist options
  • Save weirded/b0152b503c1f9d02bf24 to your computer and use it in GitHub Desktop.
Save weirded/b0152b503c1f9d02bf24 to your computer and use it in GitHub Desktop.
Base for sleeper cell style load generation.
package com.sumologic.util.scala.bench
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import com.netflix.config.scala.DynamicProperties
import com.sumologic.util.scala.env.Environment
import com.sumologic.util.scala.log.Logging
import com.sumologic.util.scala.rateLimiter.FixedRateLimiter
import com.sumologic.util.scala.time.{TimeConstants, TimeFormats, TimeSource}
import scala.util.control.NonFatal
abstract class SleeperCell(name: String,
assemblyName: String)
extends DynamicProperties
with Logging
with TimeSource
with TimeConstants {
// API to implement by subclasses.
protected def makeRequest(): Unit
protected def logStats(): Unit
protected def resetStats(): Unit
// Remote control.
private val configUpdateCallback = new Runnable() {
override def run(): Unit = checkForConfigurationUpdate()
}
private val activatedAssemblies = dynamicStringListProperty(s"sleeper.cell.$name.assemblies", List[String]())
activatedAssemblies.addCallback(configUpdateCallback)
protected val requestsPerSecond = dynamicIntProperty(s"sleeper.cell.$name.rate", Int.MaxValue)
requestsPerSecond.addCallback(configUpdateCallback)
protected val agentThreads = dynamicIntProperty(s"sleeper.cell.$name.agents", 64)
agentThreads.addCallback(configUpdateCallback)
// Stats.
protected val lastLog = new AtomicLong(now)
protected val requestCount = new AtomicInteger(0)
protected val failedRequestCount = new AtomicInteger(0)
// State.
private var activeAgents: Seq[SleeperAgent] = Seq.empty[SleeperAgent]
checkForConfigurationUpdate()
prefix(s"$name sleeper cell")
info("Initialized and awaiting instructions.")
private def checkForConfigurationUpdate() {
this synchronized {
val cellActivated = !Environment().isProd && activatedAssemblies.get().contains(assemblyName)
if (cellActivated && activeAgents.isEmpty) {
activateCell()
} else if (!cellActivated && activeAgents.nonEmpty) {
goToSleep()
} else if (cellActivated && activeAgents.size != agentThreads.get()) {
info(s"Agent count changed from ${activeAgents.size} to ${agentThreads.get()} - restarting.")
goToSleep()
activateCell()
}
}
}
private def activateCell() {
info(s"We have been activated. Activating ${agentThreads.get()} agents.")
activeAgents = (1 to agentThreads.get()).map(new SleeperAgent(_))
activeAgents.foreach(_.start())
}
private def goToSleep() {
info(s"We have been told to go back to sleep. Shutting down ${activeAgents.size} agents.")
activeAgents.foreach(_.keepRunning = false)
activeAgents.foreach(_.join())
requestCount.set(0)
failedRequestCount.set(0)
resetStats()
}
private class SleeperAgent(id: Int)
extends Thread(s"Sleeper-Agent-$name-$id")
with TimeConstants
with TimeFormats {
var keepRunning = true
val rateLimiter = new FixedRateLimiter(requestsPerSecond.get(), 1.second)
override def run() {
while (keepRunning) {
while (!rateLimiter.isActionAllowed) {
Thread.sleep(50)
}
try {
rateLimiter.recordAction()
requestCount.incrementAndGet()
makeRequest()
} catch {
case NonFatal(e) => failedRequestCount.incrementAndGet()
}
def timeToLogStats: Boolean = (now - lastLog.get()) > 15.seconds
if (timeToLogStats) {
lastLog synchronized {
if (timeToLogStats) {
logStats()
lastLog.set(now)
}
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment