Last active
August 29, 2015 14:01
-
-
Save akarnokd/f8704115aa5d1a9e4bc8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright 2014 Netflix, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | |
* use this file except in compliance with the License. You may obtain a copy of | |
* the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
* License for the specific language governing permissions and limitations under | |
* the License. | |
*/ | |
package rx; | |
import java.util.ArrayDeque; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicReference; | |
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; | |
import java.util.concurrent.locks.Lock; | |
import java.util.concurrent.locks.ReentrantLock; | |
import rx.functions.Action1; | |
/** | |
* Serialize value transfer by limiting emission to a single thread | |
* at a time. | |
* @param <T> the value type to transfer | |
*/ | |
public final class Serializer<T> implements Action1<T>{ | |
public enum Strategy { | |
DIRECT, | |
SYNCHRONIZED_ARRAY_QUEUE, | |
LOCKED_ARRAY_QUEUE, | |
ATOMIC_CL_QUEUE, | |
ATOMIC_CL_QUEUE_FAST_PATH, | |
QUEUE_BATCH_DRAIN, | |
MY_MPSC, | |
MY_MPSC_STRIDE, | |
SYNC_DOUBLE_ARRAY_DEQUE, | |
LOCK_DOUBLE_ARRAY_DEQUE, | |
MPSC_LINKED_QUEUE, | |
MPSC_LINKED_QUEUE_STRIDE, | |
MPSC_LINKED_QUEUE_FAST_PATH, | |
MPSC_LINKED_QUEUE_STATE_FAST_PATH, | |
MPSC_LINKED_QUEUE_STRIDE_FAST_PATH, | |
} | |
final Action1<? super T> impl; | |
public Serializer(Strategy strategy, Action1<? super T> consumer) { | |
Action1<? super T> a; | |
switch (strategy) { | |
case SYNCHRONIZED_ARRAY_QUEUE: | |
a = new SyncArrayQueue<>(consumer); | |
break; | |
case LOCKED_ARRAY_QUEUE: | |
a = new LockArrayQueue<>(consumer); | |
break; | |
case ATOMIC_CL_QUEUE: | |
a = new AtomicCLQueue<>(consumer); | |
break; | |
case ATOMIC_CL_QUEUE_FAST_PATH: | |
a = new AtomicCLQueueFastPath<>(consumer); | |
break; | |
case QUEUE_BATCH_DRAIN: | |
a = new QueueBatchDrain<>(consumer); | |
break; | |
case MY_MPSC: | |
a = new MyMPSCQueue<>(consumer); | |
break; | |
case MY_MPSC_STRIDE: | |
a = new MyMPSCQueueStride<>(consumer); | |
break; | |
case MPSC_LINKED_QUEUE_STRIDE_FAST_PATH: | |
a = new MPSCQueueStrideFastPath<>(consumer); | |
break; | |
case SYNC_DOUBLE_ARRAY_DEQUE: | |
a = new DoubleSyncArrayDeque<>(consumer); | |
break; | |
case LOCK_DOUBLE_ARRAY_DEQUE: | |
a = new DoubleLockArrayDeque<>(consumer); | |
break; | |
case MPSC_LINKED_QUEUE: | |
a = new MPSCQueue<>(consumer); | |
break; | |
case MPSC_LINKED_QUEUE_STRIDE: | |
a = new MPSCQueueStride<>(consumer); | |
break; | |
case MPSC_LINKED_QUEUE_FAST_PATH: | |
a = new MPSCQueueFastPath<>(consumer); | |
break; | |
case MPSC_LINKED_QUEUE_STATE_FAST_PATH: | |
a = new MPSCQueueStateBasedFastPath<>(consumer); | |
break; | |
default: | |
a = consumer; | |
} | |
this.impl = a; | |
} | |
@Override | |
public void call(T t1) { | |
impl.call(t1); | |
} | |
static final class SyncArrayQueue<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
Object[] queue; | |
int size; | |
boolean emitting; | |
public SyncArrayQueue(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public void call(T t1) { | |
Object[] localQueue; | |
int localSize; | |
synchronized (this) { | |
if (emitting) { | |
if (queue == null) { | |
queue = new Object[8]; | |
} | |
if (size == queue.length) { | |
queue = Arrays.copyOf(queue, size + (size >> 1)); | |
} | |
queue[size++] = t1; | |
return; | |
} | |
localQueue = queue; | |
localSize = size; | |
queue = null; | |
size = 0; | |
emitting = true; | |
} | |
boolean once = true; | |
do { | |
if (localQueue != null) { | |
for (int i = 0; i < localSize; i++) { | |
consumer.call((T)localQueue[i]); | |
} | |
} | |
if (once) { | |
once = false; | |
consumer.call(t1); | |
} | |
synchronized (this) { | |
localQueue = queue; | |
localSize = size; | |
queue = null; | |
size = 0; | |
if (localQueue == null) { | |
emitting = false; | |
break; | |
} | |
} | |
} while (true); | |
} | |
} | |
static final class LockArrayQueue<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
Object[] queue; | |
int size; | |
boolean emitting; | |
final Lock lock; | |
public LockArrayQueue(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
this.lock = new ReentrantLock(); | |
} | |
@Override | |
@SuppressWarnings("unchecked") | |
public void call(T t1) { | |
Object[] localQueue; | |
int localSize; | |
lock.lock(); | |
try { | |
if (emitting) { | |
if (queue == null) { | |
queue = new Object[8]; | |
} | |
if (size == queue.length) { | |
queue = Arrays.copyOf(queue, size + (size >> 1)); | |
} | |
queue[size++] = t1; | |
return; | |
} | |
localQueue = queue; | |
localSize = size; | |
queue = null; | |
size = 0; | |
emitting = true; | |
} finally { | |
lock.unlock(); | |
} | |
boolean once = true; | |
do { | |
if (localQueue != null) { | |
for (int i = 0; i < localSize; i++) { | |
consumer.call((T)localQueue[i]); | |
} | |
} | |
if (once) { | |
once = false; | |
consumer.call(t1); | |
} | |
lock.lock(); | |
try { | |
localQueue = queue; | |
localSize = size; | |
queue = null; | |
size = 0; | |
if (localQueue == null) { | |
emitting = false; | |
break; | |
} | |
} finally { | |
lock.unlock(); | |
} | |
} while (true); | |
} | |
} | |
static final class AtomicCLQueue<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final AtomicInteger wip; | |
final ConcurrentLinkedQueue<T> queue; | |
public AtomicCLQueue(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
this.wip = new AtomicInteger(); | |
this.queue = new ConcurrentLinkedQueue<>(); | |
} | |
@Override | |
public void call(T t) { | |
queue.offer(t); | |
if (wip.getAndIncrement() == 0) { | |
do { | |
consumer.call(queue.poll()); | |
} while (wip.decrementAndGet() > 0); | |
} | |
} | |
} | |
static final class AtomicCLQueueFastPath<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final AtomicInteger wip; | |
final ConcurrentLinkedQueue<T> queue; | |
public AtomicCLQueueFastPath(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
this.wip = new AtomicInteger(); | |
this.queue = new ConcurrentLinkedQueue<>(); | |
} | |
@Override | |
public void call(T t) { | |
if(wip.compareAndSet(0, 1)) { // no contention? | |
consumer.call(t); | |
if (wip.compareAndSet(1, 0)) { | |
return; // still no contention | |
} | |
wip.decrementAndGet(); | |
do { | |
consumer.call(queue.poll()); | |
} while (wip.decrementAndGet() > 0); | |
return; | |
} | |
else { | |
if (!queue.offer(t)) { | |
throw new AssertionError("queue should be unbounded"); | |
} | |
if (wip.getAndIncrement() != 0) { | |
return; // other consumer running | |
} | |
} | |
do { | |
consumer.call(queue.poll()); | |
} while (wip.decrementAndGet() > 0); | |
} | |
} | |
static final class QueueBatchDrain<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final List<T> buffer = new ArrayList<>(); | |
final AtomicInteger wip = new AtomicInteger(); | |
final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>(); | |
public QueueBatchDrain(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
} | |
@Override | |
public void call(T t1) { | |
queue.offer(t1); | |
if (wip.getAndIncrement() == 0) { | |
do { | |
buffer.clear(); | |
int n = queue.drainTo(buffer, wip.get()); | |
for (T t : buffer) { | |
consumer.call(t); | |
} | |
if (wip.addAndGet(-n) <= 0) { | |
break; | |
} | |
} while (true); | |
} | |
} | |
} | |
static final class MyMPSCQueue<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final AtomicInteger wip = new AtomicInteger(); | |
final MPSCDrainQueue<T> queue = new MPSCDrainQueue<>(); | |
public MyMPSCQueue(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
} | |
@Override | |
public void call(T t1) { | |
queue.offer(t1); | |
if (wip.getAndAdd(1) == 0) { | |
do { | |
consumer.call(queue.poll()); | |
if (wip.decrementAndGet() == 0) { | |
break; | |
} | |
} while (true); | |
} | |
} | |
} | |
private static int nextPowerOf2(int x) { | |
if ((x & (x - 1)) == 0) { | |
return x; | |
} | |
x = x | (x >> 1); | |
x = x | (x >> 2); | |
x = x | (x >> 4); | |
x = x | (x >> 8); | |
x = x | (x >> 16); | |
x = x | (x >> 24); | |
return x - (x >> 1); | |
} | |
@SuppressWarnings({"rawtypes", "unchecked"}) | |
static final class MyMPSCQueueStride<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final AtomicInteger wip = new AtomicInteger(); | |
final MPSCDrainQueue[] queue; | |
static final int NCPU = Runtime.getRuntime().availableProcessors(); | |
final int mask; | |
public MyMPSCQueueStride(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
this.mask = nextPowerOf2(NCPU) - 1; | |
this.queue = new MPSCDrainQueue[mask + 1]; | |
for (int j = 0; j < queue.length; j++) { | |
queue[j] = new MPSCDrainQueue(); | |
} | |
} | |
@Override | |
public void call(T t1) { | |
long id = Thread.currentThread().getId(); | |
final MPSCDrainQueue[] qs = queue; | |
final MPSCDrainQueue q = qs[(int)id & mask]; | |
final AtomicInteger w = wip; | |
final Action1<? super T> cons = consumer; | |
q.offer(t1); | |
int max = w.incrementAndGet(); | |
if (max == 1) { | |
do { | |
int c = 0; | |
for (MPSCDrainQueue q0 : qs) { | |
T v = (T)q0.poll(); | |
if (v != null) { | |
cons.call(v); | |
if (++c >= max) { | |
break; | |
} | |
} | |
} | |
max = w.addAndGet(-c); | |
} while (max > 0); | |
} | |
} | |
} | |
static final class MPSCDrainQueue<E> { | |
static final class Node<E> { | |
@SuppressWarnings("rawtypes") | |
static final AtomicReferenceFieldUpdater<Node, Node> NEXT_UPDATER | |
= AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next"); | |
E value; | |
volatile Node<E> next; | |
public Node(E value) { | |
this.value = value; | |
} | |
boolean casNext(Node<E> expected, Node<E> actual) { | |
return NEXT_UPDATER.compareAndSet(this, expected, actual); | |
} | |
} | |
static final class PaddedTail<E> { | |
@SuppressWarnings("rawtypes") | |
static final AtomicReferenceFieldUpdater<PaddedTail, Node> TAIL_UPDATER | |
= AtomicReferenceFieldUpdater.newUpdater(PaddedTail.class, Node.class, "tail"); | |
volatile Node<E> tail; | |
public int p0; | |
public long p1, p2, p3, p4, p5, p6; | |
public long noopt() { | |
return p0 + p1 + p2 + p3 + p4 + p5 + p6; | |
} | |
void casTail(Node<E> t, Node<E> newNode) { | |
TAIL_UPDATER.compareAndSet(this, t, newNode); | |
} | |
public PaddedTail(Node<E> initialTail) { | |
this.tail = initialTail; | |
} | |
} | |
Node<E> head = new Node<>(null); | |
final PaddedTail<E> ptail = new PaddedTail<>(head); | |
public boolean offer(E e) { | |
Node<E> newNode = new Node<>(e); | |
for (;;) { | |
Node<E> t = ptail.tail; | |
Node<E> p = t.next; | |
if (p == null && t.casNext(null, newNode)) { | |
ptail.casTail(t, newNode); | |
return true; | |
} | |
} | |
} | |
public void dropFirst() { | |
Node<E> h = head; | |
head = h.next; | |
h.value = null; | |
} | |
public E poll() { | |
Node<E> n = head; | |
Node<E> nx = n.next; | |
if (nx == null) { | |
return null; | |
} | |
head = nx; | |
E v = nx.value; | |
nx.value = null; | |
return v; | |
} | |
public int drainAll(final Action1<? super E> actual) { | |
int c = 0; | |
Node<E> n = head; | |
Node<E> nx = n.next; | |
while (nx != null) { | |
actual.call(nx.value); | |
n = nx; | |
nx = nx.next; | |
c++; | |
} | |
head = n; | |
n.value = null; | |
return c; | |
} | |
} | |
public static final class DoubleSyncArrayDeque<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final ArrayDeque<T> deque1; | |
final ArrayDeque<T> deque2; | |
ArrayDeque<T> queue; | |
volatile boolean emitting; | |
public DoubleSyncArrayDeque(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
this.deque1 = new ArrayDeque<>(); | |
this.deque2 = new ArrayDeque<>(); | |
this.queue = deque1; | |
} | |
@Override | |
public void call(T t1) { | |
ArrayDeque<T> q; | |
synchronized (this) { | |
queue.offer(t1); | |
if (emitting) { | |
return; | |
} | |
emitting = true; | |
q = queue; | |
queue = q == deque1 ? deque2 : deque1; | |
} | |
do { | |
T v; | |
while ((v = q.poll()) != null) { | |
consumer.call(v); | |
} | |
synchronized (this) { | |
q = queue; | |
if (q.isEmpty()) { | |
emitting = false; | |
return; | |
} | |
queue = q == deque1 ? deque2 : deque1; | |
} | |
} while (true); | |
} | |
} | |
public static final class DoubleLockArrayDeque<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final ArrayDeque<T> deque1; | |
final ArrayDeque<T> deque2; | |
final Lock lock; | |
ArrayDeque<T> queue; | |
volatile boolean emitting; | |
public DoubleLockArrayDeque(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
this.deque1 = new ArrayDeque<>(); | |
this.deque2 = new ArrayDeque<>(); | |
this.lock = new ReentrantLock(); | |
this.queue = deque1; | |
} | |
@Override | |
public void call(T t1) { | |
ArrayDeque<T> q; | |
lock.lock(); | |
try { | |
queue.offer(t1); | |
if (emitting) { | |
return; | |
} | |
emitting = true; | |
q = queue; | |
queue = q == deque1 ? deque2 : deque1; | |
} finally { | |
lock.unlock(); | |
} | |
do { | |
T v; | |
while ((v = q.poll()) != null) { | |
consumer.call(v); | |
} | |
lock.lock(); | |
try { | |
q = queue; | |
if (q.isEmpty()) { | |
emitting = false; | |
return; | |
} | |
queue = q == deque1 ? deque2 : deque1; | |
} finally { | |
lock.unlock(); | |
} | |
} while (true); | |
} | |
} | |
static final class PaddedAtomicInteger extends AtomicInteger { | |
private static final long serialVersionUID = 1L; | |
public int p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13 = 14; | |
public int noopt() { | |
return p1 + p2 + p3 + p4 + p5 + p6 + p7 + p8 + p9 + p10 + p11 + p12 + p13; | |
} | |
} | |
static final class MpscLinkedQueue<E> extends AtomicReference<MpscLinkedQueue.Node<E>> { | |
private static final long serialVersionUID = 1L; | |
@SuppressWarnings("rawtypes") | |
static final AtomicReferenceFieldUpdater<MpscLinkedQueue, Node> TAIL_UPDATER | |
= AtomicReferenceFieldUpdater.newUpdater(MpscLinkedQueue.class, Node.class, "tail"); | |
volatile Node<E> tail; | |
public MpscLinkedQueue() { | |
Node<E> first = new Node<>(null); | |
tail = first; | |
set(first); | |
} | |
public boolean offer(E v) { | |
Node<E> n = new Node<>(v); | |
getAndSet(n).setTail(n); | |
return true; | |
} | |
public boolean offer(E v, int numTries) { | |
Node<E> n = new Node<>(v); | |
for (; numTries-- > 0;) { | |
Node<E> h = get(); | |
if (compareAndSet(h, n)) { | |
h.setTail(n); | |
return true; | |
} | |
} | |
return false; | |
} | |
public E poll() { | |
Node<E> n = peekNode(); | |
if (n == null) { | |
return null; | |
} | |
Node<E> result = n; | |
TAIL_UPDATER.lazySet(this, n); | |
return result.value; | |
} | |
private Node<E> peekNode() { | |
for (;;) { | |
@SuppressWarnings("unchecked") | |
Node<E> t = TAIL_UPDATER.get(this); | |
Node<E> n = t.getTail(); | |
if (n != null || get() == t) { | |
return n; | |
} | |
} | |
} | |
static final class Node<E> { | |
final E value; | |
@SuppressWarnings("rawtypes") | |
static final AtomicReferenceFieldUpdater<Node, Node> TAIL_UPDATER | |
= AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail"); | |
volatile Node<E> tail; | |
public Node(E value) { | |
this.value = value; | |
} | |
public void setTail(Node<E> newTail) { | |
TAIL_UPDATER.lazySet(this, newTail); | |
} | |
@SuppressWarnings("unchecked") | |
public Node<E> getTail() { | |
return TAIL_UPDATER.get(this); | |
} | |
} | |
} | |
static final class MPSCQueue<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final MpscLinkedQueue<T> queue; | |
final AtomicInteger wip; | |
public MPSCQueue(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
this.wip = new AtomicInteger(); | |
this.queue = new MpscLinkedQueue<>(); | |
} | |
@Override | |
public void call(T t1) { | |
queue.offer(t1); | |
if (wip.getAndIncrement() == 0) { | |
do { | |
consumer.call(queue.poll()); | |
} while (wip.decrementAndGet() > 0); | |
} | |
} | |
} | |
static final class MPSCQueueFastPath<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final MpscLinkedQueue<T> queue; | |
final AtomicInteger wip; | |
public MPSCQueueFastPath(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
this.wip = new AtomicInteger(); | |
this.queue = new MpscLinkedQueue<>(); | |
} | |
@Override | |
public void call(T t1) { | |
if (wip.compareAndSet(0, 1)) { | |
consumer.call(t1); | |
if (wip.compareAndSet(1, 0)) { | |
return; | |
} | |
wip.decrementAndGet(); | |
} else { | |
queue.offer(t1); | |
if (wip.getAndIncrement() != 0) { | |
return; | |
} | |
} | |
do { | |
consumer.call(queue.poll()); | |
} while (wip.decrementAndGet() > 0); | |
} | |
} | |
static final class MPSCQueueStateBasedFastPath<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final MpscLinkedQueue<T> queue; | |
final PaddedAtomicInteger wip; | |
private static final int STATE_IDLE = 0; | |
private static final int STATE_WORKING = 1; | |
private static final int STATE_ADDED = 2; | |
public MPSCQueueStateBasedFastPath(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
this.wip = new PaddedAtomicInteger(); | |
this.queue = new MpscLinkedQueue<>(); | |
} | |
@Override | |
public void call(T t1) { | |
// try becoming the emitter | |
if (wip.compareAndSet(STATE_IDLE, STATE_WORKING)) { | |
consumer.call(t1); | |
if (wip.compareAndSet(STATE_WORKING, STATE_IDLE)) { | |
return; | |
} | |
for (;;) { | |
// process work as much as possible | |
T v; | |
while ((v = queue.poll()) != null) { | |
consumer.call(v); | |
} | |
// try becoming idle | |
int state2 = wip.get(); | |
if (state2 == STATE_WORKING && wip.compareAndSet(STATE_WORKING, STATE_IDLE)) { | |
return; | |
} | |
// if we get here, somebody else added more work | |
wip.lazySet(STATE_WORKING); | |
} | |
} else { | |
queue.offer(t1); | |
for (;;) { | |
int state = wip.get(); | |
// if we are in working or added state, switch to added state | |
if ((state == STATE_WORKING || state == STATE_ADDED) && wip.compareAndSet(state, STATE_ADDED)) { | |
return; | |
} | |
// otherwise, try becoming the emitter | |
if (wip.compareAndSet(STATE_IDLE, STATE_WORKING)) { | |
for (;;) { | |
// process work as much as possible | |
T v; | |
while ((v = queue.poll()) != null) { | |
consumer.call(v); | |
} | |
// try becoming idle | |
int state2 = wip.get(); | |
if (state2 == STATE_WORKING && wip.compareAndSet(STATE_WORKING, STATE_IDLE)) { | |
return; | |
} | |
// if we get here, somebody else added more work | |
wip.lazySet(STATE_WORKING); | |
} | |
} | |
// we didn't become the emitter, retry to see where whe are | |
} | |
} | |
} | |
} | |
static final class MPSCQueueCoopStateBasedFastPath<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final MpscLinkedQueue<T> queue; | |
final PaddedAtomicInteger wip; | |
private static final int STATE_IDLE = 0; | |
private static final int STATE_WORKING = 1; | |
private static final int STATE_ADDED = 2; | |
private static final int MAX_TRIES = 64; | |
public MPSCQueueCoopStateBasedFastPath(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
this.wip = new PaddedAtomicInteger(); | |
this.queue = new MpscLinkedQueue<>(); | |
} | |
@Override | |
public void call(T t1) { | |
for (;;) { | |
// try becoming the emitter | |
if (wip.compareAndSet(STATE_IDLE, STATE_WORKING)) { | |
consumer.call(t1); | |
if (wip.compareAndSet(STATE_WORKING, STATE_IDLE)) { | |
return; | |
} | |
for (;;) { | |
// process work as much as possible | |
T v; | |
while ((v = queue.poll()) != null) { | |
consumer.call(v); | |
} | |
// try becoming idle | |
int state2 = wip.get(); | |
if (state2 == STATE_WORKING && wip.compareAndSet(STATE_WORKING, STATE_IDLE)) { | |
return; | |
} | |
// if we get here, somebody else added more work | |
wip.lazySet(STATE_WORKING); | |
} | |
} else { | |
// try to offer it for a limited time then retry with the direct emission | |
if (queue.offer(t1, MAX_TRIES)) { | |
continue; | |
} | |
for (;;) { | |
int state = wip.get(); | |
// if we are in working or added state, switch to added state | |
if ((state == STATE_WORKING || state == STATE_ADDED) && wip.compareAndSet(state, STATE_ADDED)) { | |
return; | |
} | |
// otherwise, try becoming the emitter | |
if (wip.compareAndSet(STATE_IDLE, STATE_WORKING)) { | |
for (;;) { | |
// process work as much as possible | |
T v; | |
while ((v = queue.poll()) != null) { | |
consumer.call(v); | |
} | |
// try becoming idle | |
int state2 = wip.get(); | |
if (state2 == STATE_WORKING && wip.compareAndSet(STATE_WORKING, STATE_IDLE)) { | |
return; | |
} | |
// if we get here, somebody else added more work | |
wip.lazySet(STATE_WORKING); | |
} | |
} | |
// we didn't become the emitter, retry to see where whe are | |
} | |
} | |
} | |
} | |
} | |
@SuppressWarnings({"rawtypes", "unchecked"}) | |
static final class MPSCQueueStrideFastPath<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final AtomicInteger wip = new AtomicInteger(); | |
final MpscLinkedQueue[] queue; | |
static final int NCPU = Runtime.getRuntime().availableProcessors(); | |
final int mask; | |
public MPSCQueueStrideFastPath(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
this.mask = nextPowerOf2(NCPU) - 1; | |
this.queue = new MpscLinkedQueue[mask + 1]; | |
for (int j = 0; j < queue.length; j++) { | |
queue[j] = new MpscLinkedQueue(); | |
} | |
} | |
@Override | |
public void call(T t1) { | |
int max; | |
if (wip.compareAndSet(0, 1)) { | |
consumer.call(t1); | |
max = wip.decrementAndGet(); | |
if (max == 0) { | |
return; | |
} | |
} else { | |
final long id = Thread.currentThread().getId(); | |
final MpscLinkedQueue q = queue[(int)id & mask]; | |
q.offer(t1); | |
max = wip.incrementAndGet(); | |
if (max != 1) { | |
return; | |
} | |
} | |
do { | |
int c = 0; | |
for (MpscLinkedQueue q0 : queue) { | |
T v = (T)q0.poll(); | |
if (v != null) { | |
consumer.call(v); | |
if (++c == max) { | |
break; | |
} | |
} | |
} | |
max = wip.addAndGet(-c); | |
} while (max > 0); | |
} | |
} | |
@SuppressWarnings({"rawtypes", "unchecked"}) | |
static final class MPSCQueueStride<T> implements Action1<T> { | |
final Action1<? super T> consumer; | |
final AtomicInteger wip = new AtomicInteger(); | |
final MpscLinkedQueue[] queue; | |
static final int NCPU = Runtime.getRuntime().availableProcessors(); | |
final int mask; | |
public MPSCQueueStride(Action1<? super T> consumer) { | |
this.consumer = consumer; | |
this.mask = nextPowerOf2(NCPU) - 1; | |
this.queue = new MpscLinkedQueue[mask + 1]; | |
for (int j = 0; j < queue.length; j++) { | |
queue[j] = new MpscLinkedQueue(); | |
} | |
} | |
@Override | |
public void call(T t1) { | |
long id = Thread.currentThread().getId(); | |
final MpscLinkedQueue[] qs = queue; | |
final MpscLinkedQueue q = qs[(int)id & mask]; | |
final AtomicInteger w = wip; | |
final Action1<? super T> cons = consumer; | |
q.offer(t1); | |
int max = w.incrementAndGet(); | |
if (max == 1) { | |
do { | |
int c = 0; | |
for (MpscLinkedQueue q0 : qs) { | |
T v = (T)q0.poll(); | |
if (v != null) { | |
cons.call(v); | |
if (++c >= max) { | |
break; | |
} | |
} | |
} | |
max = w.addAndGet(-c); | |
} while (max > 0); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment