Skip to content

Instantly share code, notes, and snippets.

/post.scala
Created Jul 20, 2017

Embed
What would you like to do?
the description for this gist
private def getQueueForRunnable(command: Runnable) = {
val runnableHash = command.hashCode()
def sbhash(i: Int) = reverseBytes(i * 0x9e3775cd) * 0x9e3775cd
def getNext = executionCounter.incrementAndGet() % parallelism
def updateIfAbsentAndGetQueueIndex(
workerQueueIndex: AtomicReference[ImmutableIntMap],
runnableHash: Int, queueIndex: Int): Int = {
@tailrec
def updateIndex(): Unit = {
val prev = workerQueueIndex.get()
if (!runnableToWorkerQueueIndex.
compareAndSet(prev, prev.updateIfAbsent(runnableHash, queueIndex))) {
updateIndex()
}
}
updateIndex()
workerQueueIndex.get().get(runnableHash) // can safely call get..
}
val workQueueIndex =
if (fairDistributionThreshold == 0
|| runnableToWorkerQueueIndex.get().size > fairDistributionThreshold)
Math.abs(sbhash(runnableHash)) % parallelism
else
updateIfAbsentAndGetQueueIndex(runnableToWorkerQueueIndex, runnableHash, getNext)
workQueues(workQueueIndex)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.