Created
October 21, 2016 19:44
-
-
Save nitsanw/ca0b4dbed6a41947ad1c14e4f76f1320 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
protected final boolean offerColdPath(final E[] buffer, final long mask, final E e, final long index, | |
final long offset) { | |
final long lookAheadStep = this.lookAheadStep; | |
// normal case, go around the buffer or resize if full (unless we hit max capacity) | |
if (lookAheadStep > 0) { | |
long lookAheadElementOffset = calcElementOffset(index + lookAheadStep, mask); | |
// Try and look ahead a number of elements so we don't have to do this all the time | |
if (null == lvElement(buffer, lookAheadElementOffset)) { | |
producerBufferLimit = index + lookAheadStep - 1; // joy, there's plenty of room | |
writeToQueue(buffer, e, index, offset); | |
return true; | |
} | |
// we're at max capacity, can use up last element | |
final int maxCapacity = maxQueueCapacity; | |
if (mask + 1 == maxCapacity) { | |
if (null == lvElement(buffer, offset)) { | |
writeToQueue(buffer, e, index, offset); | |
return true; | |
} | |
// we're full and can't grow | |
return false; | |
} | |
// not at max capacity, so must allow extra slot for JUMP | |
if (null == lvElement(buffer, calcElementOffset(index + 1, mask))) { // buffer is not full | |
writeToQueue(buffer, e, index, offset); | |
} else { | |
// allocate new buffer of same length | |
final E[] newBuffer = allocate((int) (2*(mask +1) + 1)); | |
producerBuffer = newBuffer; | |
producerMask = newBuffer.length - 2; | |
final long offsetInNew = calcElementOffset(index, producerMask); | |
linkOldToNew(index, buffer, offset, newBuffer, offsetInNew, e); | |
int newCapacity = (int) (producerMask + 1); | |
if (newCapacity == maxCapacity) { | |
long currConsumerIndex = lvConsumerIndex(); | |
// use lookAheadStep to store the consumer distance from final buffer | |
this.lookAheadStep = -(index - currConsumerIndex); | |
producerBufferLimit = currConsumerIndex + maxCapacity - 1; | |
} else { | |
producerBufferLimit = index + producerMask - 1; | |
adjustLookAheadStep(newCapacity); | |
} | |
} | |
return true; | |
} | |
// the step is negative (or zero) in the period between allocating the max sized buffer and the | |
// consumer starting on it | |
else { | |
final long prevElementsInOtherBuffers = -lookAheadStep; | |
// until the consumer starts using the current buffer we need to check consumer index to | |
// verify size | |
long currConsumerIndex = lvConsumerIndex(); | |
int size = (int) (index - currConsumerIndex); | |
int maxCapacity = (int) mask+1; // we're on max capacity or we wouldn't be here | |
if (size == maxCapacity) { | |
// consumer index has not changed since adjusting the lookAhead index, we're full | |
return false; | |
} | |
// if consumerIndex progressed enough so that current size indicates it is on same buffer | |
long firstIndexInCurrentBuffer = producerBufferLimit - maxCapacity + prevElementsInOtherBuffers; | |
if (currConsumerIndex >= firstIndexInCurrentBuffer) { | |
// job done, we've now settled into our final state | |
adjustLookAheadStep(maxCapacity); | |
} | |
// consumer is still on some other buffer | |
else { | |
// how many elements out of buffer? | |
this.lookAheadStep = (int) (currConsumerIndex - firstIndexInCurrentBuffer); | |
} | |
producerBufferLimit = currConsumerIndex + maxCapacity; | |
writeToQueue(buffer, e, index, offset); | |
return true; | |
} | |
} | |
private void adjustLookAheadStep(int capacity) { | |
lookAheadStep = Math.min(capacity / 4, SpscArrayQueue.MAX_LOOK_AHEAD_STEP); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment