Last active
August 29, 2015 14:10
-
-
Save akarnokd/d561a92481062c94d792 to your computer and use it in GitHub Desktop.
Resizable concurrent Array Queue implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package rx.internal.util.unsafe; | |
import java.util.*; | |
/** | |
* A single-producer single-consumer circular array with resize capability. | |
* The methods size(), isEmpty(), peek(), poll() need to be called from the consumer thread whereas | |
* offer() needs to be called from the producer thread only. | |
*/ | |
public class AtomicArrayQueueUnsafe extends AbstractQueue<Object> { | |
static final Object TOMBSTONE = new Object(); | |
static final long P_BUFFER; | |
static final long ARRAY_OFFSET; | |
static final int ARRAY_SCALE; | |
static final int ARRAY_INDEX_SIZE; | |
static { | |
try { | |
P_BUFFER = UnsafeAccess.UNSAFE.objectFieldOffset(AtomicArrayQueueUnsafe.class.getDeclaredField("buffer")); | |
} catch (NoSuchFieldException ex) { | |
throw new RuntimeException(); | |
} | |
int is = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class); | |
int as; | |
if (is == 4) { | |
as = 2; | |
} else | |
if (is == 8) { | |
as = 3; | |
} else { | |
throw new RuntimeException("Unsupported array index scale: " + is); | |
} | |
ARRAY_SCALE = as; | |
ARRAY_INDEX_SIZE = is; | |
ARRAY_OFFSET = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class); | |
} | |
Object[] buffer; | |
long readerIndex; | |
long writerIndex; | |
final int maxCapacity; | |
private Object[] lpBuffer() { | |
return (Object[])UnsafeAccess.UNSAFE.getObject(this, P_BUFFER); | |
} | |
private Object[] lvBuffer() { | |
return (Object[])UnsafeAccess.UNSAFE.getObjectVolatile(this, P_BUFFER); | |
} | |
private void soBuffer(Object[] buffer) { | |
UnsafeAccess.UNSAFE.putOrderedObject(this, P_BUFFER, buffer); | |
} | |
private Object lvElement(Object[] buffer, long offset) { | |
return UnsafeAccess.UNSAFE.getObjectVolatile(buffer, offset); | |
} | |
private void soElement(Object[] buffer, long offset, Object value) { | |
UnsafeAccess.UNSAFE.putOrderedObject(buffer, offset, value); | |
} | |
private boolean casElement(Object[] buffer, long offset, Object expected, Object value) { | |
return UnsafeAccess.UNSAFE.compareAndSwapObject(buffer, offset, expected, value); | |
} | |
private long calcOffset(long index, int mask) { | |
return ARRAY_OFFSET + ((index & mask) << ARRAY_SCALE); | |
} | |
static int roundToPowerOfTwo(final int value) { | |
return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); | |
} | |
public AtomicArrayQueueUnsafe(int initialCapacity, int maxCapacity) { | |
int c0; | |
if (initialCapacity >= 1 << 30) { | |
c0 = 1 << 30; | |
} else { | |
c0 = roundToPowerOfTwo(initialCapacity); | |
} | |
int cm; | |
if (maxCapacity >= 1 << 30) { | |
cm = 1 << 30; | |
} else { | |
cm = roundToPowerOfTwo(maxCapacity); | |
} | |
this.maxCapacity = cm; | |
soBuffer(new Object[c0]);; | |
} | |
Object[] grow(Object[] b, long wo, int n) { | |
int arrayScale = ARRAY_SCALE; | |
int arrayIndexSize = ARRAY_INDEX_SIZE; | |
long arrayOffset = ARRAY_OFFSET; | |
int n2 = n * 2 + 2; | |
Object[] b2 = new Object[n2]; | |
// move the front to the back | |
boolean caughtUp = false; | |
for (long i = wo - arrayIndexSize, j = wo + (n << arrayScale); i >= arrayOffset; i -= arrayIndexSize, j -= arrayIndexSize) { | |
Object o = lvElement(b, i); | |
if (o == null || !casElement(b, i, o, TOMBSTONE)) { | |
caughtUp = true; | |
break; | |
} else | |
soElement(b2, j, o); | |
} | |
// leave everything beyont the wrap point at its location | |
if (!caughtUp) { | |
long no = arrayOffset + (n << arrayScale); | |
for (long i = no; i >= wo; i -= arrayIndexSize) { | |
Object o = lvElement(b, i); | |
if (o == null || !casElement(b, i, o, TOMBSTONE)) { | |
break; | |
} | |
soElement(b2, i, o); | |
} | |
} | |
soBuffer(b2); | |
return b2; | |
} | |
@Override | |
public boolean offer(Object o) { | |
long wi = writerIndex; | |
Object[] b = lpBuffer(); | |
int n = b.length - 1; | |
long wo = calcOffset(wi, n); | |
if (lvElement(b, wo) != null) { | |
if (n + 1 == maxCapacity) { | |
return false; | |
} | |
// grow | |
b = grow(b, wo, n); | |
wo = calcOffset(wi, n * 2 + 1); | |
} | |
soElement(b, wo, o); | |
writerIndex = wi + 1; | |
return true; | |
} | |
@Override | |
public Object poll() { | |
long ri = readerIndex; | |
Object[] b = lpBuffer(); | |
for (;;) { | |
int mask = b.length - 1; | |
long ro = calcOffset(ri, mask); | |
Object o = lvElement(b, ro); | |
if (o == null) { | |
return null; | |
} | |
if (o == TOMBSTONE || !casElement(b, ro, o, null)) { | |
b = lvBuffer(); | |
continue; | |
} | |
readerIndex = ri + 1; | |
return o; | |
} | |
} | |
@Override | |
public Object peek() { | |
long ri = readerIndex; | |
Object[] b = lpBuffer(); | |
for (;;) { | |
int mask = b.length - 1; | |
long ro = calcOffset(ri, mask); | |
Object o = lvElement(b, ro); | |
if (o != TOMBSTONE) { | |
return o; | |
} | |
b = lvBuffer(); | |
} | |
} | |
@Override | |
public boolean isEmpty() { | |
return peek() == null; | |
} | |
@Override | |
public int size() { | |
int size = 0; | |
long ri = readerIndex; | |
Object[] b = lpBuffer(); | |
for (;;) { | |
int mask = b.length - 1; | |
long ro = calcOffset(ri, mask); | |
Object o = lvElement(b, ro); | |
if (o == null) { | |
return size; | |
} else | |
if (o == TOMBSTONE) { | |
b = lvBuffer(); | |
continue; | |
} | |
size++; | |
ri++; | |
} | |
} | |
@Override | |
public Iterator<Object> iterator() { | |
throw new UnsupportedOperationException(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright 2014 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package rx.internal; | |
import java.util.ArrayDeque; | |
import java.util.concurrent.*; | |
import org.openjdk.jmh.annotations.*; | |
import org.openjdk.jmh.infra.Blackhole; | |
import rx.internal.util.*; | |
import rx.internal.util.unsafe.*; | |
/** | |
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*QP.*" | |
*/ | |
@BenchmarkMode(Mode.Throughput) | |
@OutputTimeUnit(TimeUnit.SECONDS) | |
public class QP { | |
@State(Scope.Thread) | |
public static class Times { | |
@Param({ "1", "1000", "1000000" }) | |
public int times; | |
} | |
@State(Scope.Thread) | |
public static class SyncArrayQueueState { | |
public final SyncArrayQueue queue = new SyncArrayQueue(); | |
} | |
@State(Scope.Thread) | |
public static class ArrayDequeState { | |
public final ArrayDeque<Object> queue = new ArrayDeque<Object>(); | |
} | |
@State(Scope.Thread) | |
public static class ArrayBlockingQueueState { | |
public final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(1024 * 1024); | |
} | |
@State(Scope.Thread) | |
public static class SpscArrayQueueState { | |
public final SpscArrayQueue<Object> queue = new SpscArrayQueue<Object>(1024 * 1024); | |
} | |
@State(Scope.Thread) | |
public static class AtomicArrayQueueState { | |
public final AtomicArrayQueue queue = new AtomicArrayQueue(8, 1024 * 1024); | |
} | |
@State(Scope.Thread) | |
public static class AtomicArrayQueueUnsafeState { | |
public final AtomicArrayQueueUnsafe queue = new AtomicArrayQueueUnsafe(8, 1024 * 1024); | |
} | |
@State(Scope.Thread) | |
public static class ConcurrentLinkedQueueState { | |
public final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>(); | |
} | |
@State(Scope.Thread) | |
public static class SpscLinkedQueueState { | |
public final SpscLinkedQueue<Object> queue = new SpscLinkedQueue<Object>(); | |
} | |
@Benchmark | |
public void syncArrayQueueAR1(SyncArrayQueueState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void syncArrayQueueARN(SyncArrayQueueState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
} | |
for (int i = 0; i < times.times; i++) { | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void arrayDequeAR1(ArrayDequeState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void arrayDequeARN(ArrayDequeState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
} | |
for (int i = 0; i < times.times; i++) { | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void arrayBlockingQueueAR1(ArrayBlockingQueueState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void arrayBlockingQueueARN(ArrayBlockingQueueState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
} | |
for (int i = 0; i < times.times; i++) { | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void spscArrayQueueAR1(SpscArrayQueueState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void spscArrayQueueARN(SpscArrayQueueState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
} | |
for (int i = 0; i < times.times; i++) { | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void atomicArrayQueueAR1(AtomicArrayQueueState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void atomicArrayQueueARN(AtomicArrayQueueState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
} | |
for (int i = 0; i < times.times; i++) { | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void atomicArrayQueueUnsafeAR1(AtomicArrayQueueUnsafeState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void atomicArrayQueueUnsafeARN(AtomicArrayQueueUnsafeState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
} | |
for (int i = 0; i < times.times; i++) { | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void concurrentLinkedQueueAR1(ConcurrentLinkedQueueState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void concurrentLinkedQueueARN(ConcurrentLinkedQueueState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
} | |
for (int i = 0; i < times.times; i++) { | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void spscLinkedQueueAR1(SpscLinkedQueueState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
bh.consume(state.queue.poll()); | |
} | |
} | |
@Benchmark | |
public void spscLinkedQueueARN(SpscLinkedQueueState state, Times times, Blackhole bh) { | |
for (int i = 0; i < times.times; i++) { | |
state.queue.offer(0); | |
} | |
for (int i = 0; i < times.times; i++) { | |
bh.consume(state.queue.poll()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Single threaded offer/poll comparison with other queues: