Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A StripedExecutor in Scala using only JDK primitives, to serialize execution per stripe
// ©2012 Viktor Klang
// 4,192 bytes jarred
package java.klang
import java.util.concurrent. { Executor, ConcurrentHashMap, ConcurrentLinkedQueue }
import java.util.concurrent.atomic. { AtomicBoolean }
import scala.annotation.tailrec
trait Striped {
self: Runnable =>
def stripeId: AnyRef
}
class StripedExecutor(val parent: Executor) extends Executor {
private val stripes = new ConcurrentHashMap[AnyRef, AnyRef]
override final def execute(runnable: Runnable): Unit = {
runnable match {
case r: Striped with Runnable => executeStripe(r, r.stripeId)
case other => parent execute other
}
}
private def executeStripe(runnable: Runnable, stripe: AnyRef): Unit = (stripe match {
case null => parent
case stripe =>
(stripes get stripe) match {
case e: ExecutorStripe => e
case null =>
val newExecutorStripe = new ExecutorStripe(parent)
stripes.putIfAbsent(stripe, newExecutorStripe) match {
case null => newExecutorStripe
case existing: ExecutorStripe => existing
}
}
}) execute runnable
}
class ExecutorStripe(private val parent: Executor) extends ConcurrentLinkedQueue[Runnable] with Runnable with Executor {
private val scheduled = new AtomicBoolean(false)
final override def execute(runnable: Runnable): Unit = {
assert(offer(runnable))
trySchedule()
}
final override def run = try {
@tailrec def runNext(): Unit = poll match {
case null =>
()
case some =>
some.run()
runNext()
}
runNext()
} finally {
scheduled.set(false)
trySchedule()
}
private final def trySchedule(): Unit = {
if (!isEmpty && scheduled.compareAndSet(false, true)) {
try {
parent execute this
} catch {
case e => scheduled.set(false); throw e
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.