Created
May 19, 2014 15:03
-
-
Save akarnokd/0fb508e772cc405a467d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright 2014 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | |
* use this file except in compliance with the License. You may obtain a copy of | |
* the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
* License for the specific language governing permissions and limitations under | |
* the License. | |
*/ | |
package rx; | |
import java.util.ArrayDeque; | |
import java.util.concurrent.atomic.AtomicIntegerArray; | |
import java.util.concurrent.atomic.AtomicLongFieldUpdater; | |
import java.util.concurrent.atomic.AtomicReference; | |
import java.util.concurrent.atomic.AtomicReferenceArray; | |
import java.util.concurrent.locks.LockSupport; | |
import rx.functions.Action1; | |
import rx.functions.Func0; | |
public final class RingSerializer<E> { | |
final int capacity; | |
final int mask; | |
final AtomicReferenceArray<E> array; | |
final Func0<? extends E> valueFactory; | |
final ArrayDeque<E> sideBuffer; | |
static final int INITIAL_SIDEBUFFER_SIZE = 32; | |
long USE_SIDE_BUFFER = -2; | |
final Sequence cursor; | |
final Sequence gatingSequenceCache; | |
final Action1<? super E> consumer; | |
final AtomicIntegerArray available; | |
final int indexMask; | |
final int indexShift; | |
final Sequence readSequence; | |
final AtomicReference<ConsumerState> readerState; | |
public RingSerializer(Func0<? extends E> valueFactory, int capacity, Action1<? super E> consumer) { | |
if ((capacity & (capacity - 1)) != 0) { | |
throw new IllegalArgumentException("Capacity needs to be power of two!"); | |
} | |
this.capacity = nextPowerOf2(capacity); | |
this.mask = this.capacity - 1; | |
this.array = new AtomicReferenceArray<>(this.capacity); | |
this.valueFactory = valueFactory; | |
this.sideBuffer = new ArrayDeque<>(); | |
this.consumer = consumer; | |
this.cursor = new Sequence(); | |
this.gatingSequenceCache = new Sequence(); | |
this.available = new AtomicIntegerArray(this.capacity); | |
this.indexMask = this.capacity - 1; | |
this.indexShift = Integer.numberOfTrailingZeros(this.capacity); | |
this.readSequence = new Sequence(); | |
this.readerState = new AtomicReference<>(ConsumerState.EMPTY_IDLE); | |
fill(); | |
} | |
private void fill() { | |
int c = capacity; | |
for (int i = 0; i < c; i++) { | |
array.lazySet(i, valueFactory.call()); | |
} | |
for (int i = 0; i < INITIAL_SIDEBUFFER_SIZE; i++) { | |
sideBuffer.offer(valueFactory.call()); | |
} | |
for (int i = 0; i < capacity; i++) { | |
available.lazySet(i, -1); | |
} | |
} | |
public E get(long sequence) { | |
if (sequence == USE_SIDE_BUFFER) { | |
E holder = valueFactory.call(); | |
sideBuffer.offer(holder); | |
return holder; | |
} | |
return getDirect(sequence); | |
} | |
private E getDirect(long sequence) { | |
return array.get((int)sequence & mask); | |
} | |
public long next() { | |
do { | |
long current = cursor.get(); | |
long next = current + 1; | |
long wrapPoint = next - capacity; | |
long cachedGatingSequence = gatingSequenceCache.get(); | |
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { | |
if (wrapPoint > current) { | |
// wait or sidestep | |
long tid = Thread.currentThread().getId(); | |
ConsumerState cs = readerState.get(); | |
// if there is no active reader | |
if (cs.isIdle()) { | |
ConsumerState ns = cs.becomeReader(tid); | |
// try becoming the reader and report to use the side buffer to enqueue extra work | |
if (readerState.compareAndSet(cs, ns)) { | |
return USE_SIDE_BUFFER; | |
} | |
} | |
// otherwise, wait until the buffer gets read | |
LockSupport.parkNanos(1L); // use wait strategy instead | |
continue; | |
} | |
gatingSequenceCache.set(current); | |
} else | |
if (cursor.compareAndSet(current, next)) { | |
return next; | |
} | |
} while (true); | |
} | |
public long cursor() { | |
return cursor.get(); | |
} | |
private int index(long sequence) { | |
return (int)sequence & indexMask; | |
} | |
private int flag(long sequence) { | |
return ((int)sequence) >>> indexShift; | |
} | |
private void setAvailable(long sequence) { | |
available.lazySet(index(sequence), flag(sequence)); | |
} | |
public boolean isAvailable(long sequence) { | |
return available.get(index(sequence)) == flag(sequence); | |
} | |
public void publish(long sequence) { | |
if (sequence == USE_SIDE_BUFFER) { | |
consumeRing(); | |
E holder; | |
while ((holder = sideBuffer.poll()) != null) { | |
consumer.call(holder); | |
} | |
consumeLoop(); | |
return; | |
} | |
setAvailable(sequence); | |
// try to process ring by this thread or signal more work for the other | |
long tid = Thread.currentThread().getId(); | |
do { | |
ConsumerState cs = readerState.get(); | |
if (cs.isIdle()) { | |
ConsumerState ns = cs.becomeReader(tid); | |
if (readerState.compareAndSet(cs, ns)) { | |
consumeLoop(); | |
return; | |
} | |
} else { | |
ConsumerState ns = cs.addMoreWork(); | |
if (readerState.compareAndSet(cs, ns)) { | |
return; | |
} | |
} | |
} while (true); | |
} | |
private void consumeLoop() { | |
while (true) { | |
consumeRing(); | |
// process ring as long as possible | |
ConsumerState cs = readerState.get(); | |
if (!cs.isAvailable()) { | |
ConsumerState ns = cs.endProcessing(); | |
// try to end processing | |
if (readerState.compareAndSet(cs, ns)) { | |
return; | |
} | |
// if fails, it means we received more work in the meantime | |
// so continue consuming the ring | |
continue; | |
} | |
ConsumerState ns = cs.processWork(); | |
readerState.compareAndSet(cs, ns); | |
} | |
} | |
private void consumeRing() { | |
long current = readSequence.get(); | |
long next = current + 1; | |
long avail = highestAvailable(next, cursor.get()); | |
if (next <= avail) { | |
long processed = current; | |
try { | |
do { | |
E e = getDirect(next); | |
consumer.call(e); | |
processed = next; | |
next++; | |
} while (next <= avail); | |
} finally { | |
readSequence.set(processed); | |
} | |
} | |
} | |
private long highestAvailable(long start, long available) { | |
for (long i = start; i <= available; i++) { | |
if (!isAvailable(i)) { | |
return i - 1; | |
} | |
} | |
return available; | |
} | |
private static int nextPowerOf2(int x) { | |
if ((x & (x - 1)) == 0) { | |
return x; | |
} | |
x = x | (x >> 1); | |
x = x | (x >> 2); | |
x = x | (x >> 4); | |
x = x | (x >> 8); | |
x = x | (x >> 16); | |
x = x | (x >> 24); | |
return x - (x >> 1); | |
} | |
} | |
final class ConsumerState { | |
final long readerThreadId; | |
final boolean available; | |
static final ConsumerState EMPTY_IDLE = new ConsumerState(); | |
private ConsumerState() { | |
this.readerThreadId = 0; | |
this.available = false; | |
} | |
private ConsumerState(long tid, boolean available) { | |
this.readerThreadId = tid; | |
this.available = available; | |
} | |
public ConsumerState becomeReader(long tid) { | |
return new ConsumerState(tid, false); | |
} | |
public ConsumerState endProcessing() { | |
return EMPTY_IDLE; | |
} | |
public ConsumerState addMoreWork() { | |
return new ConsumerState(readerThreadId, true); | |
} | |
public ConsumerState processWork() { | |
return new ConsumerState(readerThreadId, false); | |
} | |
public boolean isIdle() { | |
return readerThreadId == 0; | |
} | |
public boolean isAvailable() { | |
return readerThreadId != 0 && available; | |
} | |
} | |
class LeftPadding { | |
protected long p1, p2, p3, p4, p5, p6, p7 = 7; | |
} | |
class Value extends LeftPadding { | |
protected volatile long value; | |
} | |
class RightPadding extends Value { | |
protected long p9, p10, p11, p12, p13, p14, p15 = 16; | |
} | |
final class Sequence extends RightPadding { | |
static final AtomicLongFieldUpdater<Value> UPDATER = AtomicLongFieldUpdater.newUpdater(Value.class, "value"); | |
public static final long INITIAL_VALUE = -1L; | |
public Sequence() { | |
this(INITIAL_VALUE); | |
} | |
public Sequence(final long value) { | |
set(value); | |
} | |
public long get() { | |
return value; | |
} | |
public void set(final long value) { | |
UPDATER.lazySet(this, value); | |
} | |
public void setVolatile(final long value) { | |
this.value = value; | |
} | |
public boolean compareAndSet(long expected, long newValue) { | |
return UPDATER.compareAndSet(this, expected, newValue); | |
} | |
public long incrementAndGet() { | |
return addAndGet(1); | |
} | |
public long addAndGet(long add) { | |
return UPDATER.addAndGet(this, add); | |
} | |
@Override | |
public String toString() { | |
return Long.toString(value); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment