Skip to content

Instantly share code, notes, and snippets.

@nitsanw
Created October 21, 2016 10:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nitsanw/a0130d5113fb04064d2d41ffc9bec71e to your computer and use it in GitHub Desktop.
Save nitsanw/a0130d5113fb04064d2d41ffc9bec71e to your computer and use it in GitHub Desktop.
Unbounded SPSC LAQ cold path
@Override
protected boolean offerColdPath(E[] buffer, long mask, E e, long pIndex, long offset) {
// use a fixed lookahead step based on buffer capacity
final long lookAheadStep = (mask + 1) / 4;
long pBufferLimit = pIndex + lookAheadStep;
// go around the buffer or add a new buffer
if (null == lvElement(buffer, calcElementOffset(pBufferLimit, mask))) {
// the element lookAheadStep away is empty, so we can use up to the element before it
producerBufferLimit = pBufferLimit - 1;
writeToQueue(buffer, e, pIndex, offset);
}
else if (null == lvElement(buffer, calcElementOffset(pIndex + 1, mask))) {
// the next element is null, so we can write to the current element
writeToQueue(buffer, e, pIndex, offset);
}
else {
// we got one slot left to write into => need to link new buffer.
final E[] newBuffer = (E[]) new Object[buffer.length];
producerBuffer = newBuffer;
producerBufferLimit = pIndex + mask - 1;
linkOldToNew(pIndex, buffer, offset, newBuffer, offset, e);
}
return true;
}
void linkOldToNew(long currIndex,
E[] oldBuffer,
long offsetInOld,
E[] newBuffer,
long offsetInNew,
E e) {
soElement(newBuffer, offsetInNew, e); // ordered: newBuffer[index & newMask] = e;
soNext(oldBuffer, newBuffer); // ordered: oldBuffer[oldBuffer.length - 1] = newBuffer;
soElement(oldBuffer, offsetInOld, JUMP); // ordered: oldBuffer[index & oldMask] = JUMP;
soProducerIndex(currIndex + 1);// ordered: producerIndex = currIndex + 1;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment