Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Last active August 29, 2015 14:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akarnokd/d561a92481062c94d792 to your computer and use it in GitHub Desktop.
Save akarnokd/d561a92481062c94d792 to your computer and use it in GitHub Desktop.
Resizable concurrent Array Queue implementation
package rx.internal.util.unsafe;
import java.util.*;
/**
* A single-producer single-consumer circular array with resize capability.
* The methods size(), isEmpty(), peek(), poll() need to be called from the consumer thread whereas
* offer() needs to be called from the producer thread only.
*/
public class AtomicArrayQueueUnsafe extends AbstractQueue<Object> {
static final Object TOMBSTONE = new Object();
static final long P_BUFFER;
static final long ARRAY_OFFSET;
static final int ARRAY_SCALE;
static final int ARRAY_INDEX_SIZE;
static {
try {
P_BUFFER = UnsafeAccess.UNSAFE.objectFieldOffset(AtomicArrayQueueUnsafe.class.getDeclaredField("buffer"));
} catch (NoSuchFieldException ex) {
throw new RuntimeException();
}
int is = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
int as;
if (is == 4) {
as = 2;
} else
if (is == 8) {
as = 3;
} else {
throw new RuntimeException("Unsupported array index scale: " + is);
}
ARRAY_SCALE = as;
ARRAY_INDEX_SIZE = is;
ARRAY_OFFSET = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class);
}
Object[] buffer;
long readerIndex;
long writerIndex;
final int maxCapacity;
private Object[] lpBuffer() {
return (Object[])UnsafeAccess.UNSAFE.getObject(this, P_BUFFER);
}
private Object[] lvBuffer() {
return (Object[])UnsafeAccess.UNSAFE.getObjectVolatile(this, P_BUFFER);
}
private void soBuffer(Object[] buffer) {
UnsafeAccess.UNSAFE.putOrderedObject(this, P_BUFFER, buffer);
}
private Object lvElement(Object[] buffer, long offset) {
return UnsafeAccess.UNSAFE.getObjectVolatile(buffer, offset);
}
private void soElement(Object[] buffer, long offset, Object value) {
UnsafeAccess.UNSAFE.putOrderedObject(buffer, offset, value);
}
private boolean casElement(Object[] buffer, long offset, Object expected, Object value) {
return UnsafeAccess.UNSAFE.compareAndSwapObject(buffer, offset, expected, value);
}
private long calcOffset(long index, int mask) {
return ARRAY_OFFSET + ((index & mask) << ARRAY_SCALE);
}
static int roundToPowerOfTwo(final int value) {
return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
}
public AtomicArrayQueueUnsafe(int initialCapacity, int maxCapacity) {
int c0;
if (initialCapacity >= 1 << 30) {
c0 = 1 << 30;
} else {
c0 = roundToPowerOfTwo(initialCapacity);
}
int cm;
if (maxCapacity >= 1 << 30) {
cm = 1 << 30;
} else {
cm = roundToPowerOfTwo(maxCapacity);
}
this.maxCapacity = cm;
soBuffer(new Object[c0]);;
}
Object[] grow(Object[] b, long wo, int n) {
int arrayScale = ARRAY_SCALE;
int arrayIndexSize = ARRAY_INDEX_SIZE;
long arrayOffset = ARRAY_OFFSET;
int n2 = n * 2 + 2;
Object[] b2 = new Object[n2];
// move the front to the back
boolean caughtUp = false;
for (long i = wo - arrayIndexSize, j = wo + (n << arrayScale); i >= arrayOffset; i -= arrayIndexSize, j -= arrayIndexSize) {
Object o = lvElement(b, i);
if (o == null || !casElement(b, i, o, TOMBSTONE)) {
caughtUp = true;
break;
} else
soElement(b2, j, o);
}
// leave everything beyont the wrap point at its location
if (!caughtUp) {
long no = arrayOffset + (n << arrayScale);
for (long i = no; i >= wo; i -= arrayIndexSize) {
Object o = lvElement(b, i);
if (o == null || !casElement(b, i, o, TOMBSTONE)) {
break;
}
soElement(b2, i, o);
}
}
soBuffer(b2);
return b2;
}
@Override
public boolean offer(Object o) {
long wi = writerIndex;
Object[] b = lpBuffer();
int n = b.length - 1;
long wo = calcOffset(wi, n);
if (lvElement(b, wo) != null) {
if (n + 1 == maxCapacity) {
return false;
}
// grow
b = grow(b, wo, n);
wo = calcOffset(wi, n * 2 + 1);
}
soElement(b, wo, o);
writerIndex = wi + 1;
return true;
}
@Override
public Object poll() {
long ri = readerIndex;
Object[] b = lpBuffer();
for (;;) {
int mask = b.length - 1;
long ro = calcOffset(ri, mask);
Object o = lvElement(b, ro);
if (o == null) {
return null;
}
if (o == TOMBSTONE || !casElement(b, ro, o, null)) {
b = lvBuffer();
continue;
}
readerIndex = ri + 1;
return o;
}
}
@Override
public Object peek() {
long ri = readerIndex;
Object[] b = lpBuffer();
for (;;) {
int mask = b.length - 1;
long ro = calcOffset(ri, mask);
Object o = lvElement(b, ro);
if (o != TOMBSTONE) {
return o;
}
b = lvBuffer();
}
}
@Override
public boolean isEmpty() {
return peek() == null;
}
@Override
public int size() {
int size = 0;
long ri = readerIndex;
Object[] b = lpBuffer();
for (;;) {
int mask = b.length - 1;
long ro = calcOffset(ri, mask);
Object o = lvElement(b, ro);
if (o == null) {
return size;
} else
if (o == TOMBSTONE) {
b = lvBuffer();
continue;
}
size++;
ri++;
}
}
@Override
public Iterator<Object> iterator() {
throw new UnsupportedOperationException();
}
}
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal;
import java.util.ArrayDeque;
import java.util.concurrent.*;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import rx.internal.util.*;
import rx.internal.util.unsafe.*;
/**
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*QP.*"
*/
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public class QP {
@State(Scope.Thread)
public static class Times {
@Param({ "1", "1000", "1000000" })
public int times;
}
@State(Scope.Thread)
public static class SyncArrayQueueState {
public final SyncArrayQueue queue = new SyncArrayQueue();
}
@State(Scope.Thread)
public static class ArrayDequeState {
public final ArrayDeque<Object> queue = new ArrayDeque<Object>();
}
@State(Scope.Thread)
public static class ArrayBlockingQueueState {
public final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1024 * 1024);
}
@State(Scope.Thread)
public static class SpscArrayQueueState {
public final SpscArrayQueue<Object> queue = new SpscArrayQueue<Object>(1024 * 1024);
}
@State(Scope.Thread)
public static class AtomicArrayQueueState {
public final AtomicArrayQueue queue = new AtomicArrayQueue(8, 1024 * 1024);
}
@State(Scope.Thread)
public static class AtomicArrayQueueUnsafeState {
public final AtomicArrayQueueUnsafe queue = new AtomicArrayQueueUnsafe(8, 1024 * 1024);
}
@State(Scope.Thread)
public static class ConcurrentLinkedQueueState {
public final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
}
@State(Scope.Thread)
public static class SpscLinkedQueueState {
public final SpscLinkedQueue<Object> queue = new SpscLinkedQueue<Object>();
}
@Benchmark
public void syncArrayQueueAR1(SyncArrayQueueState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
bh.consume(state.queue.poll());
}
}
@Benchmark
public void syncArrayQueueARN(SyncArrayQueueState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
}
for (int i = 0; i < times.times; i++) {
bh.consume(state.queue.poll());
}
}
@Benchmark
public void arrayDequeAR1(ArrayDequeState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
bh.consume(state.queue.poll());
}
}
@Benchmark
public void arrayDequeARN(ArrayDequeState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
}
for (int i = 0; i < times.times; i++) {
bh.consume(state.queue.poll());
}
}
@Benchmark
public void arrayBlockingQueueAR1(ArrayBlockingQueueState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
bh.consume(state.queue.poll());
}
}
@Benchmark
public void arrayBlockingQueueARN(ArrayBlockingQueueState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
}
for (int i = 0; i < times.times; i++) {
bh.consume(state.queue.poll());
}
}
@Benchmark
public void spscArrayQueueAR1(SpscArrayQueueState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
bh.consume(state.queue.poll());
}
}
@Benchmark
public void spscArrayQueueARN(SpscArrayQueueState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
}
for (int i = 0; i < times.times; i++) {
bh.consume(state.queue.poll());
}
}
@Benchmark
public void atomicArrayQueueAR1(AtomicArrayQueueState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
bh.consume(state.queue.poll());
}
}
@Benchmark
public void atomicArrayQueueARN(AtomicArrayQueueState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
}
for (int i = 0; i < times.times; i++) {
bh.consume(state.queue.poll());
}
}
@Benchmark
public void atomicArrayQueueUnsafeAR1(AtomicArrayQueueUnsafeState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
bh.consume(state.queue.poll());
}
}
@Benchmark
public void atomicArrayQueueUnsafeARN(AtomicArrayQueueUnsafeState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
}
for (int i = 0; i < times.times; i++) {
bh.consume(state.queue.poll());
}
}
@Benchmark
public void concurrentLinkedQueueAR1(ConcurrentLinkedQueueState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
bh.consume(state.queue.poll());
}
}
@Benchmark
public void concurrentLinkedQueueARN(ConcurrentLinkedQueueState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
}
for (int i = 0; i < times.times; i++) {
bh.consume(state.queue.poll());
}
}
@Benchmark
public void spscLinkedQueueAR1(SpscLinkedQueueState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
bh.consume(state.queue.poll());
}
}
@Benchmark
public void spscLinkedQueueARN(SpscLinkedQueueState state, Times times, Blackhole bh) {
for (int i = 0; i < times.times; i++) {
state.queue.offer(0);
}
for (int i = 0; i < times.times; i++) {
bh.consume(state.queue.poll());
}
}
}
@akarnokd
Copy link
Author

akarnokd commented Dec 3, 2014

Single threaded offer/poll comparison with other queues:

Benchmark                          (times)   Mode   Samples         Score   Score error    Units
r.i.QP.arrayBlockingQueueAR1             1  thrpt         5  22373895,746   1230991,174    ops/s
r.i.QP.arrayBlockingQueueAR1          1000  thrpt         5     23379,555       375,320    ops/s
r.i.QP.arrayBlockingQueueAR1       1000000  thrpt         5        22,586         0,265    ops/s
r.i.QP.arrayBlockingQueueARN             1  thrpt         5  21837970,655    438243,420    ops/s
r.i.QP.arrayBlockingQueueARN          1000  thrpt         5     22727,369       505,464    ops/s
r.i.QP.arrayBlockingQueueARN       1000000  thrpt         5        17,342         0,389    ops/s
r.i.QP.arrayDequeAR1                     1  thrpt         5 125317599,067   2022013,336    ops/s
r.i.QP.arrayDequeAR1                  1000  thrpt         5    154471,662      1582,445    ops/s
r.i.QP.arrayDequeAR1               1000000  thrpt         5       153,553         6,961    ops/s
r.i.QP.arrayDequeARN                     1  thrpt         5 105060885,976   1282639,031    ops/s
r.i.QP.arrayDequeARN                  1000  thrpt         5    115148,045     11655,882    ops/s
r.i.QP.arrayDequeARN               1000000  thrpt         5        69,936         3,212    ops/s
r.i.QP.atomicArrayQueueAR1               1  thrpt         5  60420124,114    619290,433    ops/s
r.i.QP.atomicArrayQueueAR1            1000  thrpt         5     63309,420      1373,954    ops/s
r.i.QP.atomicArrayQueueAR1         1000000  thrpt         5        64,106         2,668    ops/s
r.i.QP.atomicArrayQueueARN               1  thrpt         5  52625792,849   1157675,651    ops/s
r.i.QP.atomicArrayQueueARN            1000  thrpt         5     49986,261      3487,739    ops/s
r.i.QP.atomicArrayQueueARN         1000000  thrpt         5        46,151         1,947    ops/s
r.i.QP.atomicArrayQueueUnsafeAR1         1  thrpt         5  67312005,247    734419,790    ops/s
r.i.QP.atomicArrayQueueUnsafeAR1      1000  thrpt         5     72738,961      1805,610    ops/s
r.i.QP.atomicArrayQueueUnsafeAR1   1000000  thrpt         5        93,918         1,768    ops/s
r.i.QP.atomicArrayQueueUnsafeARN         1  thrpt         5  70642145,057   2232842,170    ops/s
r.i.QP.atomicArrayQueueUnsafeARN      1000  thrpt         5     70726,547      1216,393    ops/s
r.i.QP.atomicArrayQueueUnsafeARN   1000000  thrpt         5        56,179         1,137    ops/s
r.i.QP.concurrentLinkedQueueAR1          1  thrpt         5  22974979,962    725316,960    ops/s
r.i.QP.concurrentLinkedQueueAR1       1000  thrpt         5     24445,612       559,910    ops/s
r.i.QP.concurrentLinkedQueueAR1    1000000  thrpt         5        24,475         0,388    ops/s
r.i.QP.concurrentLinkedQueueARN          1  thrpt         5  23774693,359    383443,465    ops/s
r.i.QP.concurrentLinkedQueueARN       1000  thrpt         5     25512,034      1520,667    ops/s
r.i.QP.concurrentLinkedQueueARN    1000000  thrpt         5        22,351         7,010    ops/s
r.i.QP.spscArrayQueueAR1                 1  thrpt         5  96045983,361   4563624,564    ops/s
r.i.QP.spscArrayQueueAR1              1000  thrpt         5    130445,669      2383,828    ops/s
r.i.QP.spscArrayQueueAR1           1000000  thrpt         5       146,357         4,085    ops/s
r.i.QP.spscArrayQueueARN                 1  thrpt         5 101373459,009   4504311,642    ops/s
r.i.QP.spscArrayQueueARN              1000  thrpt         5    109070,727       991,592    ops/s
r.i.QP.spscArrayQueueARN           1000000  thrpt         5        76,254         5,429    ops/s
r.i.QP.spscLinkedQueueAR1                1  thrpt         5 111388498,047   1212569,502    ops/s
r.i.QP.spscLinkedQueueAR1             1000  thrpt         5    126272,805      5525,450    ops/s
r.i.QP.spscLinkedQueueAR1          1000000  thrpt         5       125,039         6,136    ops/s
r.i.QP.spscLinkedQueueARN                1  thrpt         5  93573159,095   2235942,972    ops/s
r.i.QP.spscLinkedQueueARN             1000  thrpt         5    102024,810       906,578    ops/s
r.i.QP.spscLinkedQueueARN          1000000  thrpt         5         6,415         2,215    ops/s
r.i.QP.syncArrayQueueAR1                 1  thrpt         5  18457898,710    351327,682    ops/s
r.i.QP.syncArrayQueueAR1              1000  thrpt         5     18648,233       183,269    ops/s
r.i.QP.syncArrayQueueAR1           1000000  thrpt         5        18,515         0,808    ops/s
r.i.QP.syncArrayQueueARN                 1  thrpt         5  18631259,344    234332,585    ops/s
r.i.QP.syncArrayQueueARN              1000  thrpt         5     18747,036       364,578    ops/s
r.i.QP.syncArrayQueueARN           1000000  thrpt         5        18,221         1,469    ops/s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment