Skip to content

Instantly share code, notes, and snippets.

@viktorklang
Created January 17, 2013 00:32
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save viktorklang/4552423 to your computer and use it in GitHub Desktop.
Save viktorklang/4552423 to your computer and use it in GitHub Desktop.
Wraps an ExecutionContext into a new ExecutionContext which will execute its tasks in sequence, always.
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import scala.annotation.tailrec
object SerializedExecutionContext {
def apply(batchSize: Int)(implicit context: ExecutionContext): ExecutionContext = {
require(batchSize > 0, s"SerializedExecutionContext.batchSize must be greater than 0 but was $batchSize")
new ConcurrentLinkedQueue[Runnable] with Runnable with ExecutionContext {
private final val on = new AtomicInteger(0)
@tailrec private final def run(done: Int): Unit = if (done < batchSize) {
poll() match {
case null => ()
case some =>
try some.run() catch { case NonFatal(t) => context reportFailure t }
run(done + 1)
}
}
override def add(task: Runnable): Boolean = {
val r = super.add(task)
attach()
r
}
final def run(): Unit =
try { if (on.get == 1) run(0) } finally { on.set(0); attach() }
final def attach(): Unit =
if(!isEmpty && on.compareAndSet(0, 1)) {
try context.execute(this) catch { case t: Throwable => on.set(0); throw t }
}
override final def execute(task: Runnable): Unit = add(task)
override final def reportFailure(t: Throwable): Unit = context reportFailure t
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment