Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Created May 19, 2014 15:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akarnokd/0fb508e772cc405a467d to your computer and use it in GitHub Desktop.
Save akarnokd/0fb508e772cc405a467d to your computer and use it in GitHub Desktop.
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx;
import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.LockSupport;
import rx.functions.Action1;
import rx.functions.Func0;
public final class RingSerializer<E> {
final int capacity;
final int mask;
final AtomicReferenceArray<E> array;
final Func0<? extends E> valueFactory;
final ArrayDeque<E> sideBuffer;
static final int INITIAL_SIDEBUFFER_SIZE = 32;
long USE_SIDE_BUFFER = -2;
final Sequence cursor;
final Sequence gatingSequenceCache;
final Action1<? super E> consumer;
final AtomicIntegerArray available;
final int indexMask;
final int indexShift;
final Sequence readSequence;
final AtomicReference<ConsumerState> readerState;
public RingSerializer(Func0<? extends E> valueFactory, int capacity, Action1<? super E> consumer) {
if ((capacity & (capacity - 1)) != 0) {
throw new IllegalArgumentException("Capacity needs to be power of two!");
}
this.capacity = nextPowerOf2(capacity);
this.mask = this.capacity - 1;
this.array = new AtomicReferenceArray<>(this.capacity);
this.valueFactory = valueFactory;
this.sideBuffer = new ArrayDeque<>();
this.consumer = consumer;
this.cursor = new Sequence();
this.gatingSequenceCache = new Sequence();
this.available = new AtomicIntegerArray(this.capacity);
this.indexMask = this.capacity - 1;
this.indexShift = Integer.numberOfTrailingZeros(this.capacity);
this.readSequence = new Sequence();
this.readerState = new AtomicReference<>(ConsumerState.EMPTY_IDLE);
fill();
}
private void fill() {
int c = capacity;
for (int i = 0; i < c; i++) {
array.lazySet(i, valueFactory.call());
}
for (int i = 0; i < INITIAL_SIDEBUFFER_SIZE; i++) {
sideBuffer.offer(valueFactory.call());
}
for (int i = 0; i < capacity; i++) {
available.lazySet(i, -1);
}
}
public E get(long sequence) {
if (sequence == USE_SIDE_BUFFER) {
E holder = valueFactory.call();
sideBuffer.offer(holder);
return holder;
}
return getDirect(sequence);
}
private E getDirect(long sequence) {
return array.get((int)sequence & mask);
}
public long next() {
do {
long current = cursor.get();
long next = current + 1;
long wrapPoint = next - capacity;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
if (wrapPoint > current) {
// wait or sidestep
long tid = Thread.currentThread().getId();
ConsumerState cs = readerState.get();
// if there is no active reader
if (cs.isIdle()) {
ConsumerState ns = cs.becomeReader(tid);
// try becoming the reader and report to use the side buffer to enqueue extra work
if (readerState.compareAndSet(cs, ns)) {
return USE_SIDE_BUFFER;
}
}
// otherwise, wait until the buffer gets read
LockSupport.parkNanos(1L); // use wait strategy instead
continue;
}
gatingSequenceCache.set(current);
} else
if (cursor.compareAndSet(current, next)) {
return next;
}
} while (true);
}
public long cursor() {
return cursor.get();
}
private int index(long sequence) {
return (int)sequence & indexMask;
}
private int flag(long sequence) {
return ((int)sequence) >>> indexShift;
}
private void setAvailable(long sequence) {
available.lazySet(index(sequence), flag(sequence));
}
public boolean isAvailable(long sequence) {
return available.get(index(sequence)) == flag(sequence);
}
public void publish(long sequence) {
if (sequence == USE_SIDE_BUFFER) {
consumeRing();
E holder;
while ((holder = sideBuffer.poll()) != null) {
consumer.call(holder);
}
consumeLoop();
return;
}
setAvailable(sequence);
// try to process ring by this thread or signal more work for the other
long tid = Thread.currentThread().getId();
do {
ConsumerState cs = readerState.get();
if (cs.isIdle()) {
ConsumerState ns = cs.becomeReader(tid);
if (readerState.compareAndSet(cs, ns)) {
consumeLoop();
return;
}
} else {
ConsumerState ns = cs.addMoreWork();
if (readerState.compareAndSet(cs, ns)) {
return;
}
}
} while (true);
}
private void consumeLoop() {
while (true) {
consumeRing();
// process ring as long as possible
ConsumerState cs = readerState.get();
if (!cs.isAvailable()) {
ConsumerState ns = cs.endProcessing();
// try to end processing
if (readerState.compareAndSet(cs, ns)) {
return;
}
// if fails, it means we received more work in the meantime
// so continue consuming the ring
continue;
}
ConsumerState ns = cs.processWork();
readerState.compareAndSet(cs, ns);
}
}
private void consumeRing() {
long current = readSequence.get();
long next = current + 1;
long avail = highestAvailable(next, cursor.get());
if (next <= avail) {
long processed = current;
try {
do {
E e = getDirect(next);
consumer.call(e);
processed = next;
next++;
} while (next <= avail);
} finally {
readSequence.set(processed);
}
}
}
private long highestAvailable(long start, long available) {
for (long i = start; i <= available; i++) {
if (!isAvailable(i)) {
return i - 1;
}
}
return available;
}
private static int nextPowerOf2(int x) {
if ((x & (x - 1)) == 0) {
return x;
}
x = x | (x >> 1);
x = x | (x >> 2);
x = x | (x >> 4);
x = x | (x >> 8);
x = x | (x >> 16);
x = x | (x >> 24);
return x - (x >> 1);
}
}
final class ConsumerState {
final long readerThreadId;
final boolean available;
static final ConsumerState EMPTY_IDLE = new ConsumerState();
private ConsumerState() {
this.readerThreadId = 0;
this.available = false;
}
private ConsumerState(long tid, boolean available) {
this.readerThreadId = tid;
this.available = available;
}
public ConsumerState becomeReader(long tid) {
return new ConsumerState(tid, false);
}
public ConsumerState endProcessing() {
return EMPTY_IDLE;
}
public ConsumerState addMoreWork() {
return new ConsumerState(readerThreadId, true);
}
public ConsumerState processWork() {
return new ConsumerState(readerThreadId, false);
}
public boolean isIdle() {
return readerThreadId == 0;
}
public boolean isAvailable() {
return readerThreadId != 0 && available;
}
}
class LeftPadding {
protected long p1, p2, p3, p4, p5, p6, p7 = 7;
}
class Value extends LeftPadding {
protected volatile long value;
}
class RightPadding extends Value {
protected long p9, p10, p11, p12, p13, p14, p15 = 16;
}
final class Sequence extends RightPadding {
static final AtomicLongFieldUpdater<Value> UPDATER = AtomicLongFieldUpdater.newUpdater(Value.class, "value");
public static final long INITIAL_VALUE = -1L;
public Sequence() {
this(INITIAL_VALUE);
}
public Sequence(final long value) {
set(value);
}
public long get() {
return value;
}
public void set(final long value) {
UPDATER.lazySet(this, value);
}
public void setVolatile(final long value) {
this.value = value;
}
public boolean compareAndSet(long expected, long newValue) {
return UPDATER.compareAndSet(this, expected, newValue);
}
public long incrementAndGet() {
return addAndGet(1);
}
public long addAndGet(long add) {
return UPDATER.addAndGet(this, add);
}
@Override
public String toString() {
return Long.toString(value);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment