Created
October 21, 2014 23:17
-
-
Save inexplicable/9e23a7705b4bdb4e5f3b to your computer and use it in GitHub Desktop.
cassandra-throttle.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case object TakeAction | |
sealed trait ChannelAction{ | |
def path:String //path is the primary key | |
} | |
case class SaveChannel(channelEntity:ChannelEntity, onComplete:(Boolean => Unit)) extends ChannelAction { | |
def path = channelEntity.path | |
} | |
case class LoadChannel(path:String, promised:Promise[ChannelEntity]) extends ChannelAction | |
case class DeleteChannel(path:String, onComplete:(() => Unit)) extends ChannelAction | |
trait ActionsBuffer { | |
private[cube] var actions = Seq.empty[ChannelAction] | |
def isEmpty = actions.isEmpty | |
def pending = actions.size | |
def accepts[T <: ChannelAction](action:T)(implicit clazzTag:ClassTag[T]):Unit = { | |
//the actions is reordered on purpose, to maximize the batches of `save`, `delete` or `load` | |
//the order of actions won't affect the update/query result as long as their primary keys (path) don't conflict with others | |
val backwards = actions.reverse.takeWhile{elem => | |
elem match { | |
case a if a.getClass == clazzTag.runtimeClass => | |
false //should be injected immediately after this | |
case a => | |
a.path != action.path //keep going forward to the same group possibly, unless path conflict blocks | |
} | |
}.reverse | |
val behinds = actions.take(actions.size - backwards.size).lastOption match { | |
case Some(a) if a.getClass == clazzTag.runtimeClass => | |
backwards | |
case _ => | |
Seq.empty[ChannelAction] //blocked by other type of actions, just append to the end | |
} | |
//the new action is put in between | |
actions = (actions.take(actions.size - behinds.size) :+ action) ++ behinds | |
} | |
def combine[T <: ChannelAction](implicit clazzTag:ClassTag[T]):Seq[T] = { | |
val combined = actions.takeWhile(_.getClass == clazzTag.runtimeClass) | |
actions = actions.drop(combined.size) | |
combined.map(_.asInstanceOf[T]) | |
} | |
} | |
class ChannelsManager extends Actor with ActionsBuffer with Logging { | |
import scala.collection.JavaConversions._ | |
import scala.concurrent.duration._ | |
private[this] implicit val persistenceClient = CategoriesManagement(context.system).persistenceClient | |
override def preStart = { | |
import context.dispatcher | |
context.system.scheduler.scheduleOnce(10.millis, self, TakeAction) | |
} | |
def noop(b:Boolean):Unit = { | |
//no operation | |
} | |
def receive = { | |
case save:SaveChannel => | |
accepts[SaveChannel](save) | |
case load:LoadChannel => | |
accepts[LoadChannel](load) | |
case delete:DeleteChannel => | |
accepts[DeleteChannel](delete) | |
case TakeAction => | |
//must exhaust the actions | |
var iterations = 0 | |
val count = pending | |
while(!isEmpty) { | |
val now = System.nanoTime | |
combine[SaveChannel] match { | |
case saves => | |
saves.size match { | |
case s if s == 0 => | |
//nothing to do | |
case s if s == 1 => | |
logger.info("[channelsmanager] saving channels:{} in a batch", saves.map(_.path)) | |
val (entity, onComplete) = (saves.head.channelEntity, saves.head.onComplete) | |
PersistenceService(persistenceClient).existsAsync(entity.path(), {exists => if(exists) onComplete(false) else { | |
PersistenceService(persistenceClient).setDataAsync(entity.path(), entity, new Runnable { | |
override def run() = { | |
onComplete(true) | |
logger.debug("[channelsmanager] saving single channel:{} using:{}ns", entity, (System.nanoTime - now).toString) | |
} | |
}) | |
}}) | |
case s => | |
logger.info("[channelsmanager] saving channels:{} in a batch", saves.map(_.path)) | |
PersistenceService(persistenceClient).setDataAsync(saves.map{case SaveChannel(entity, onComplete) => | |
(entity.path(), entity, new Runnable { | |
override def run() = { | |
onComplete(true) | |
logger.debug("[channelsmanager] saving multiple channels:{} in batch using:{}ns", saves.map(_.channelEntity), (System.nanoTime - now).toString) | |
} | |
}) | |
}) | |
} | |
} | |
combine[DeleteChannel] match { | |
case deletes if deletes.nonEmpty => | |
logger.info("[channelsmanager] deleting channels:{} in a batch", deletes.map(_.path)) | |
PersistenceService(persistenceClient).dropDataAsync[ChannelEntity](deletes.map { | |
case DeleteChannel(path, onComplete) => (path, onComplete) | |
}) | |
case _ => | |
} | |
combine[LoadChannel] match { | |
case loads if loads.nonEmpty => | |
logger.info("[channelsmanager] loading channels:{} in a batch", loads.map(_.path)) | |
PersistenceService(persistenceClient).getDataAsync[ChannelEntity](loads.map { r => (r.path, { channelOption: Option[ChannelEntity] => channelOption match { | |
case Some(channelEntity) => | |
noop(r.promised.trySuccess(channelEntity)) | |
case None => | |
noop(r.promised.tryFailure(new NoSuchElementException)) | |
}})}) | |
case _ => | |
} | |
iterations += 1 | |
} | |
//don't log too much junk | |
if(count > 0) { | |
logger.info("[channelsmanager] took: {} actions and finished with:{} iterations to exhaust", count.toString, iterations.toString) | |
} | |
import context.dispatcher | |
context.system.scheduler.scheduleOnce(10.millis, self, TakeAction) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment