Skip to content

Instantly share code, notes, and snippets.

@inexplicable
Created October 21, 2014 23:17
Show Gist options
  • Save inexplicable/9e23a7705b4bdb4e5f3b to your computer and use it in GitHub Desktop.
Save inexplicable/9e23a7705b4bdb4e5f3b to your computer and use it in GitHub Desktop.
cassandra-throttle.scala
case object TakeAction
sealed trait ChannelAction{
def path:String //path is the primary key
}
case class SaveChannel(channelEntity:ChannelEntity, onComplete:(Boolean => Unit)) extends ChannelAction {
def path = channelEntity.path
}
case class LoadChannel(path:String, promised:Promise[ChannelEntity]) extends ChannelAction
case class DeleteChannel(path:String, onComplete:(() => Unit)) extends ChannelAction
trait ActionsBuffer {
private[cube] var actions = Seq.empty[ChannelAction]
def isEmpty = actions.isEmpty
def pending = actions.size
def accepts[T <: ChannelAction](action:T)(implicit clazzTag:ClassTag[T]):Unit = {
//the actions is reordered on purpose, to maximize the batches of `save`, `delete` or `load`
//the order of actions won't affect the update/query result as long as their primary keys (path) don't conflict with others
val backwards = actions.reverse.takeWhile{elem =>
elem match {
case a if a.getClass == clazzTag.runtimeClass =>
false //should be injected immediately after this
case a =>
a.path != action.path //keep going forward to the same group possibly, unless path conflict blocks
}
}.reverse
val behinds = actions.take(actions.size - backwards.size).lastOption match {
case Some(a) if a.getClass == clazzTag.runtimeClass =>
backwards
case _ =>
Seq.empty[ChannelAction] //blocked by other type of actions, just append to the end
}
//the new action is put in between
actions = (actions.take(actions.size - behinds.size) :+ action) ++ behinds
}
def combine[T <: ChannelAction](implicit clazzTag:ClassTag[T]):Seq[T] = {
val combined = actions.takeWhile(_.getClass == clazzTag.runtimeClass)
actions = actions.drop(combined.size)
combined.map(_.asInstanceOf[T])
}
}
class ChannelsManager extends Actor with ActionsBuffer with Logging {
import scala.collection.JavaConversions._
import scala.concurrent.duration._
private[this] implicit val persistenceClient = CategoriesManagement(context.system).persistenceClient
override def preStart = {
import context.dispatcher
context.system.scheduler.scheduleOnce(10.millis, self, TakeAction)
}
def noop(b:Boolean):Unit = {
//no operation
}
def receive = {
case save:SaveChannel =>
accepts[SaveChannel](save)
case load:LoadChannel =>
accepts[LoadChannel](load)
case delete:DeleteChannel =>
accepts[DeleteChannel](delete)
case TakeAction =>
//must exhaust the actions
var iterations = 0
val count = pending
while(!isEmpty) {
val now = System.nanoTime
combine[SaveChannel] match {
case saves =>
saves.size match {
case s if s == 0 =>
//nothing to do
case s if s == 1 =>
logger.info("[channelsmanager] saving channels:{} in a batch", saves.map(_.path))
val (entity, onComplete) = (saves.head.channelEntity, saves.head.onComplete)
PersistenceService(persistenceClient).existsAsync(entity.path(), {exists => if(exists) onComplete(false) else {
PersistenceService(persistenceClient).setDataAsync(entity.path(), entity, new Runnable {
override def run() = {
onComplete(true)
logger.debug("[channelsmanager] saving single channel:{} using:{}ns", entity, (System.nanoTime - now).toString)
}
})
}})
case s =>
logger.info("[channelsmanager] saving channels:{} in a batch", saves.map(_.path))
PersistenceService(persistenceClient).setDataAsync(saves.map{case SaveChannel(entity, onComplete) =>
(entity.path(), entity, new Runnable {
override def run() = {
onComplete(true)
logger.debug("[channelsmanager] saving multiple channels:{} in batch using:{}ns", saves.map(_.channelEntity), (System.nanoTime - now).toString)
}
})
})
}
}
combine[DeleteChannel] match {
case deletes if deletes.nonEmpty =>
logger.info("[channelsmanager] deleting channels:{} in a batch", deletes.map(_.path))
PersistenceService(persistenceClient).dropDataAsync[ChannelEntity](deletes.map {
case DeleteChannel(path, onComplete) => (path, onComplete)
})
case _ =>
}
combine[LoadChannel] match {
case loads if loads.nonEmpty =>
logger.info("[channelsmanager] loading channels:{} in a batch", loads.map(_.path))
PersistenceService(persistenceClient).getDataAsync[ChannelEntity](loads.map { r => (r.path, { channelOption: Option[ChannelEntity] => channelOption match {
case Some(channelEntity) =>
noop(r.promised.trySuccess(channelEntity))
case None =>
noop(r.promised.tryFailure(new NoSuchElementException))
}})})
case _ =>
}
iterations += 1
}
//don't log too much junk
if(count > 0) {
logger.info("[channelsmanager] took: {} actions and finished with:{} iterations to exhaust", count.toString, iterations.toString)
}
import context.dispatcher
context.system.scheduler.scheduleOnce(10.millis, self, TakeAction)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment