Skip to content

Instantly share code, notes, and snippets.

Created July 20, 2017 06:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/a7c9191186d97303b737d9f8ac52a56c to your computer and use it in GitHub Desktop.
Save anonymous/a7c9191186d97303b737d9f8ac52a56c to your computer and use it in GitHub Desktop.
the description for this gist
private def getQueueForRunnable(command: Runnable) = {
val runnableHash = command.hashCode()
def sbhash(i: Int) = reverseBytes(i * 0x9e3775cd) * 0x9e3775cd
def getNext = executionCounter.incrementAndGet() % parallelism
def updateIfAbsentAndGetQueueIndex(
workerQueueIndex: AtomicReference[ImmutableIntMap],
runnableHash: Int, queueIndex: ⇒ Int): Int = {
@tailrec
def updateIndex(): Unit = {
val prev = workerQueueIndex.get()
if (!runnableToWorkerQueueIndex.
compareAndSet(prev, prev.updateIfAbsent(runnableHash, queueIndex))) {
updateIndex()
}
}
updateIndex()
workerQueueIndex.get().get(runnableHash) // can safely call get..
}
val workQueueIndex =
if (fairDistributionThreshold == 0
|| runnableToWorkerQueueIndex.get().size > fairDistributionThreshold)
Math.abs(sbhash(runnableHash)) % parallelism
else
updateIfAbsentAndGetQueueIndex(runnableToWorkerQueueIndex, runnableHash, getNext)
workQueues(workQueueIndex)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment