Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Last active August 29, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akarnokd/f8704115aa5d1a9e4bc8 to your computer and use it in GitHub Desktop.
Save akarnokd/f8704115aa5d1a9e4bc8 to your computer and use it in GitHub Desktop.
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import rx.functions.Action1;
/**
* Serialize value transfer by limiting emission to a single thread
* at a time.
* @param <T> the value type to transfer
*/
public final class Serializer<T> implements Action1<T>{
public enum Strategy {
DIRECT,
SYNCHRONIZED_ARRAY_QUEUE,
LOCKED_ARRAY_QUEUE,
ATOMIC_CL_QUEUE,
ATOMIC_CL_QUEUE_FAST_PATH,
QUEUE_BATCH_DRAIN,
MY_MPSC,
MY_MPSC_STRIDE,
SYNC_DOUBLE_ARRAY_DEQUE,
LOCK_DOUBLE_ARRAY_DEQUE,
MPSC_LINKED_QUEUE,
MPSC_LINKED_QUEUE_STRIDE,
MPSC_LINKED_QUEUE_FAST_PATH,
MPSC_LINKED_QUEUE_STATE_FAST_PATH,
MPSC_LINKED_QUEUE_STRIDE_FAST_PATH,
}
final Action1<? super T> impl;
public Serializer(Strategy strategy, Action1<? super T> consumer) {
Action1<? super T> a;
switch (strategy) {
case SYNCHRONIZED_ARRAY_QUEUE:
a = new SyncArrayQueue<>(consumer);
break;
case LOCKED_ARRAY_QUEUE:
a = new LockArrayQueue<>(consumer);
break;
case ATOMIC_CL_QUEUE:
a = new AtomicCLQueue<>(consumer);
break;
case ATOMIC_CL_QUEUE_FAST_PATH:
a = new AtomicCLQueueFastPath<>(consumer);
break;
case QUEUE_BATCH_DRAIN:
a = new QueueBatchDrain<>(consumer);
break;
case MY_MPSC:
a = new MyMPSCQueue<>(consumer);
break;
case MY_MPSC_STRIDE:
a = new MyMPSCQueueStride<>(consumer);
break;
case MPSC_LINKED_QUEUE_STRIDE_FAST_PATH:
a = new MPSCQueueStrideFastPath<>(consumer);
break;
case SYNC_DOUBLE_ARRAY_DEQUE:
a = new DoubleSyncArrayDeque<>(consumer);
break;
case LOCK_DOUBLE_ARRAY_DEQUE:
a = new DoubleLockArrayDeque<>(consumer);
break;
case MPSC_LINKED_QUEUE:
a = new MPSCQueue<>(consumer);
break;
case MPSC_LINKED_QUEUE_STRIDE:
a = new MPSCQueueStride<>(consumer);
break;
case MPSC_LINKED_QUEUE_FAST_PATH:
a = new MPSCQueueFastPath<>(consumer);
break;
case MPSC_LINKED_QUEUE_STATE_FAST_PATH:
a = new MPSCQueueStateBasedFastPath<>(consumer);
break;
default:
a = consumer;
}
this.impl = a;
}
@Override
public void call(T t1) {
impl.call(t1);
}
static final class SyncArrayQueue<T> implements Action1<T> {
final Action1<? super T> consumer;
Object[] queue;
int size;
boolean emitting;
public SyncArrayQueue(Action1<? super T> consumer) {
this.consumer = consumer;
}
@Override
@SuppressWarnings("unchecked")
public void call(T t1) {
Object[] localQueue;
int localSize;
synchronized (this) {
if (emitting) {
if (queue == null) {
queue = new Object[8];
}
if (size == queue.length) {
queue = Arrays.copyOf(queue, size + (size >> 1));
}
queue[size++] = t1;
return;
}
localQueue = queue;
localSize = size;
queue = null;
size = 0;
emitting = true;
}
boolean once = true;
do {
if (localQueue != null) {
for (int i = 0; i < localSize; i++) {
consumer.call((T)localQueue[i]);
}
}
if (once) {
once = false;
consumer.call(t1);
}
synchronized (this) {
localQueue = queue;
localSize = size;
queue = null;
size = 0;
if (localQueue == null) {
emitting = false;
break;
}
}
} while (true);
}
}
static final class LockArrayQueue<T> implements Action1<T> {
final Action1<? super T> consumer;
Object[] queue;
int size;
boolean emitting;
final Lock lock;
public LockArrayQueue(Action1<? super T> consumer) {
this.consumer = consumer;
this.lock = new ReentrantLock();
}
@Override
@SuppressWarnings("unchecked")
public void call(T t1) {
Object[] localQueue;
int localSize;
lock.lock();
try {
if (emitting) {
if (queue == null) {
queue = new Object[8];
}
if (size == queue.length) {
queue = Arrays.copyOf(queue, size + (size >> 1));
}
queue[size++] = t1;
return;
}
localQueue = queue;
localSize = size;
queue = null;
size = 0;
emitting = true;
} finally {
lock.unlock();
}
boolean once = true;
do {
if (localQueue != null) {
for (int i = 0; i < localSize; i++) {
consumer.call((T)localQueue[i]);
}
}
if (once) {
once = false;
consumer.call(t1);
}
lock.lock();
try {
localQueue = queue;
localSize = size;
queue = null;
size = 0;
if (localQueue == null) {
emitting = false;
break;
}
} finally {
lock.unlock();
}
} while (true);
}
}
static final class AtomicCLQueue<T> implements Action1<T> {
final Action1<? super T> consumer;
final AtomicInteger wip;
final ConcurrentLinkedQueue<T> queue;
public AtomicCLQueue(Action1<? super T> consumer) {
this.consumer = consumer;
this.wip = new AtomicInteger();
this.queue = new ConcurrentLinkedQueue<>();
}
@Override
public void call(T t) {
queue.offer(t);
if (wip.getAndIncrement() == 0) {
do {
consumer.call(queue.poll());
} while (wip.decrementAndGet() > 0);
}
}
}
static final class AtomicCLQueueFastPath<T> implements Action1<T> {
final Action1<? super T> consumer;
final AtomicInteger wip;
final ConcurrentLinkedQueue<T> queue;
public AtomicCLQueueFastPath(Action1<? super T> consumer) {
this.consumer = consumer;
this.wip = new AtomicInteger();
this.queue = new ConcurrentLinkedQueue<>();
}
@Override
public void call(T t) {
if(wip.compareAndSet(0, 1)) { // no contention?
consumer.call(t);
if (wip.compareAndSet(1, 0)) {
return; // still no contention
}
wip.decrementAndGet();
do {
consumer.call(queue.poll());
} while (wip.decrementAndGet() > 0);
return;
}
else {
if (!queue.offer(t)) {
throw new AssertionError("queue should be unbounded");
}
if (wip.getAndIncrement() != 0) {
return; // other consumer running
}
}
do {
consumer.call(queue.poll());
} while (wip.decrementAndGet() > 0);
}
}
static final class QueueBatchDrain<T> implements Action1<T> {
final Action1<? super T> consumer;
final List<T> buffer = new ArrayList<>();
final AtomicInteger wip = new AtomicInteger();
final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
public QueueBatchDrain(Action1<? super T> consumer) {
this.consumer = consumer;
}
@Override
public void call(T t1) {
queue.offer(t1);
if (wip.getAndIncrement() == 0) {
do {
buffer.clear();
int n = queue.drainTo(buffer, wip.get());
for (T t : buffer) {
consumer.call(t);
}
if (wip.addAndGet(-n) <= 0) {
break;
}
} while (true);
}
}
}
static final class MyMPSCQueue<T> implements Action1<T> {
final Action1<? super T> consumer;
final AtomicInteger wip = new AtomicInteger();
final MPSCDrainQueue<T> queue = new MPSCDrainQueue<>();
public MyMPSCQueue(Action1<? super T> consumer) {
this.consumer = consumer;
}
@Override
public void call(T t1) {
queue.offer(t1);
if (wip.getAndAdd(1) == 0) {
do {
consumer.call(queue.poll());
if (wip.decrementAndGet() == 0) {
break;
}
} while (true);
}
}
}
private static int nextPowerOf2(int x) {
if ((x & (x - 1)) == 0) {
return x;
}
x = x | (x >> 1);
x = x | (x >> 2);
x = x | (x >> 4);
x = x | (x >> 8);
x = x | (x >> 16);
x = x | (x >> 24);
return x - (x >> 1);
}
@SuppressWarnings({"rawtypes", "unchecked"})
static final class MyMPSCQueueStride<T> implements Action1<T> {
final Action1<? super T> consumer;
final AtomicInteger wip = new AtomicInteger();
final MPSCDrainQueue[] queue;
static final int NCPU = Runtime.getRuntime().availableProcessors();
final int mask;
public MyMPSCQueueStride(Action1<? super T> consumer) {
this.consumer = consumer;
this.mask = nextPowerOf2(NCPU) - 1;
this.queue = new MPSCDrainQueue[mask + 1];
for (int j = 0; j < queue.length; j++) {
queue[j] = new MPSCDrainQueue();
}
}
@Override
public void call(T t1) {
long id = Thread.currentThread().getId();
final MPSCDrainQueue[] qs = queue;
final MPSCDrainQueue q = qs[(int)id & mask];
final AtomicInteger w = wip;
final Action1<? super T> cons = consumer;
q.offer(t1);
int max = w.incrementAndGet();
if (max == 1) {
do {
int c = 0;
for (MPSCDrainQueue q0 : qs) {
T v = (T)q0.poll();
if (v != null) {
cons.call(v);
if (++c >= max) {
break;
}
}
}
max = w.addAndGet(-c);
} while (max > 0);
}
}
}
static final class MPSCDrainQueue<E> {
static final class Node<E> {
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<Node, Node> NEXT_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
E value;
volatile Node<E> next;
public Node(E value) {
this.value = value;
}
boolean casNext(Node<E> expected, Node<E> actual) {
return NEXT_UPDATER.compareAndSet(this, expected, actual);
}
}
static final class PaddedTail<E> {
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<PaddedTail, Node> TAIL_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(PaddedTail.class, Node.class, "tail");
volatile Node<E> tail;
public int p0;
public long p1, p2, p3, p4, p5, p6;
public long noopt() {
return p0 + p1 + p2 + p3 + p4 + p5 + p6;
}
void casTail(Node<E> t, Node<E> newNode) {
TAIL_UPDATER.compareAndSet(this, t, newNode);
}
public PaddedTail(Node<E> initialTail) {
this.tail = initialTail;
}
}
Node<E> head = new Node<>(null);
final PaddedTail<E> ptail = new PaddedTail<>(head);
public boolean offer(E e) {
Node<E> newNode = new Node<>(e);
for (;;) {
Node<E> t = ptail.tail;
Node<E> p = t.next;
if (p == null && t.casNext(null, newNode)) {
ptail.casTail(t, newNode);
return true;
}
}
}
public void dropFirst() {
Node<E> h = head;
head = h.next;
h.value = null;
}
public E poll() {
Node<E> n = head;
Node<E> nx = n.next;
if (nx == null) {
return null;
}
head = nx;
E v = nx.value;
nx.value = null;
return v;
}
public int drainAll(final Action1<? super E> actual) {
int c = 0;
Node<E> n = head;
Node<E> nx = n.next;
while (nx != null) {
actual.call(nx.value);
n = nx;
nx = nx.next;
c++;
}
head = n;
n.value = null;
return c;
}
}
public static final class DoubleSyncArrayDeque<T> implements Action1<T> {
final Action1<? super T> consumer;
final ArrayDeque<T> deque1;
final ArrayDeque<T> deque2;
ArrayDeque<T> queue;
volatile boolean emitting;
public DoubleSyncArrayDeque(Action1<? super T> consumer) {
this.consumer = consumer;
this.deque1 = new ArrayDeque<>();
this.deque2 = new ArrayDeque<>();
this.queue = deque1;
}
@Override
public void call(T t1) {
ArrayDeque<T> q;
synchronized (this) {
queue.offer(t1);
if (emitting) {
return;
}
emitting = true;
q = queue;
queue = q == deque1 ? deque2 : deque1;
}
do {
T v;
while ((v = q.poll()) != null) {
consumer.call(v);
}
synchronized (this) {
q = queue;
if (q.isEmpty()) {
emitting = false;
return;
}
queue = q == deque1 ? deque2 : deque1;
}
} while (true);
}
}
public static final class DoubleLockArrayDeque<T> implements Action1<T> {
final Action1<? super T> consumer;
final ArrayDeque<T> deque1;
final ArrayDeque<T> deque2;
final Lock lock;
ArrayDeque<T> queue;
volatile boolean emitting;
public DoubleLockArrayDeque(Action1<? super T> consumer) {
this.consumer = consumer;
this.deque1 = new ArrayDeque<>();
this.deque2 = new ArrayDeque<>();
this.lock = new ReentrantLock();
this.queue = deque1;
}
@Override
public void call(T t1) {
ArrayDeque<T> q;
lock.lock();
try {
queue.offer(t1);
if (emitting) {
return;
}
emitting = true;
q = queue;
queue = q == deque1 ? deque2 : deque1;
} finally {
lock.unlock();
}
do {
T v;
while ((v = q.poll()) != null) {
consumer.call(v);
}
lock.lock();
try {
q = queue;
if (q.isEmpty()) {
emitting = false;
return;
}
queue = q == deque1 ? deque2 : deque1;
} finally {
lock.unlock();
}
} while (true);
}
}
static final class PaddedAtomicInteger extends AtomicInteger {
private static final long serialVersionUID = 1L;
public int p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13 = 14;
public int noopt() {
return p1 + p2 + p3 + p4 + p5 + p6 + p7 + p8 + p9 + p10 + p11 + p12 + p13;
}
}
static final class MpscLinkedQueue<E> extends AtomicReference<MpscLinkedQueue.Node<E>> {
private static final long serialVersionUID = 1L;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<MpscLinkedQueue, Node> TAIL_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(MpscLinkedQueue.class, Node.class, "tail");
volatile Node<E> tail;
public MpscLinkedQueue() {
Node<E> first = new Node<>(null);
tail = first;
set(first);
}
public boolean offer(E v) {
Node<E> n = new Node<>(v);
getAndSet(n).setTail(n);
return true;
}
public boolean offer(E v, int numTries) {
Node<E> n = new Node<>(v);
for (; numTries-- > 0;) {
Node<E> h = get();
if (compareAndSet(h, n)) {
h.setTail(n);
return true;
}
}
return false;
}
public E poll() {
Node<E> n = peekNode();
if (n == null) {
return null;
}
Node<E> result = n;
TAIL_UPDATER.lazySet(this, n);
return result.value;
}
private Node<E> peekNode() {
for (;;) {
@SuppressWarnings("unchecked")
Node<E> t = TAIL_UPDATER.get(this);
Node<E> n = t.getTail();
if (n != null || get() == t) {
return n;
}
}
}
static final class Node<E> {
final E value;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<Node, Node> TAIL_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail");
volatile Node<E> tail;
public Node(E value) {
this.value = value;
}
public void setTail(Node<E> newTail) {
TAIL_UPDATER.lazySet(this, newTail);
}
@SuppressWarnings("unchecked")
public Node<E> getTail() {
return TAIL_UPDATER.get(this);
}
}
}
static final class MPSCQueue<T> implements Action1<T> {
final Action1<? super T> consumer;
final MpscLinkedQueue<T> queue;
final AtomicInteger wip;
public MPSCQueue(Action1<? super T> consumer) {
this.consumer = consumer;
this.wip = new AtomicInteger();
this.queue = new MpscLinkedQueue<>();
}
@Override
public void call(T t1) {
queue.offer(t1);
if (wip.getAndIncrement() == 0) {
do {
consumer.call(queue.poll());
} while (wip.decrementAndGet() > 0);
}
}
}
static final class MPSCQueueFastPath<T> implements Action1<T> {
final Action1<? super T> consumer;
final MpscLinkedQueue<T> queue;
final AtomicInteger wip;
public MPSCQueueFastPath(Action1<? super T> consumer) {
this.consumer = consumer;
this.wip = new AtomicInteger();
this.queue = new MpscLinkedQueue<>();
}
@Override
public void call(T t1) {
if (wip.compareAndSet(0, 1)) {
consumer.call(t1);
if (wip.compareAndSet(1, 0)) {
return;
}
wip.decrementAndGet();
} else {
queue.offer(t1);
if (wip.getAndIncrement() != 0) {
return;
}
}
do {
consumer.call(queue.poll());
} while (wip.decrementAndGet() > 0);
}
}
static final class MPSCQueueStateBasedFastPath<T> implements Action1<T> {
final Action1<? super T> consumer;
final MpscLinkedQueue<T> queue;
final PaddedAtomicInteger wip;
private static final int STATE_IDLE = 0;
private static final int STATE_WORKING = 1;
private static final int STATE_ADDED = 2;
public MPSCQueueStateBasedFastPath(Action1<? super T> consumer) {
this.consumer = consumer;
this.wip = new PaddedAtomicInteger();
this.queue = new MpscLinkedQueue<>();
}
@Override
public void call(T t1) {
// try becoming the emitter
if (wip.compareAndSet(STATE_IDLE, STATE_WORKING)) {
consumer.call(t1);
if (wip.compareAndSet(STATE_WORKING, STATE_IDLE)) {
return;
}
for (;;) {
// process work as much as possible
T v;
while ((v = queue.poll()) != null) {
consumer.call(v);
}
// try becoming idle
int state2 = wip.get();
if (state2 == STATE_WORKING && wip.compareAndSet(STATE_WORKING, STATE_IDLE)) {
return;
}
// if we get here, somebody else added more work
wip.lazySet(STATE_WORKING);
}
} else {
queue.offer(t1);
for (;;) {
int state = wip.get();
// if we are in working or added state, switch to added state
if ((state == STATE_WORKING || state == STATE_ADDED) && wip.compareAndSet(state, STATE_ADDED)) {
return;
}
// otherwise, try becoming the emitter
if (wip.compareAndSet(STATE_IDLE, STATE_WORKING)) {
for (;;) {
// process work as much as possible
T v;
while ((v = queue.poll()) != null) {
consumer.call(v);
}
// try becoming idle
int state2 = wip.get();
if (state2 == STATE_WORKING && wip.compareAndSet(STATE_WORKING, STATE_IDLE)) {
return;
}
// if we get here, somebody else added more work
wip.lazySet(STATE_WORKING);
}
}
// we didn't become the emitter, retry to see where whe are
}
}
}
}
static final class MPSCQueueCoopStateBasedFastPath<T> implements Action1<T> {
final Action1<? super T> consumer;
final MpscLinkedQueue<T> queue;
final PaddedAtomicInteger wip;
private static final int STATE_IDLE = 0;
private static final int STATE_WORKING = 1;
private static final int STATE_ADDED = 2;
private static final int MAX_TRIES = 64;
public MPSCQueueCoopStateBasedFastPath(Action1<? super T> consumer) {
this.consumer = consumer;
this.wip = new PaddedAtomicInteger();
this.queue = new MpscLinkedQueue<>();
}
@Override
public void call(T t1) {
for (;;) {
// try becoming the emitter
if (wip.compareAndSet(STATE_IDLE, STATE_WORKING)) {
consumer.call(t1);
if (wip.compareAndSet(STATE_WORKING, STATE_IDLE)) {
return;
}
for (;;) {
// process work as much as possible
T v;
while ((v = queue.poll()) != null) {
consumer.call(v);
}
// try becoming idle
int state2 = wip.get();
if (state2 == STATE_WORKING && wip.compareAndSet(STATE_WORKING, STATE_IDLE)) {
return;
}
// if we get here, somebody else added more work
wip.lazySet(STATE_WORKING);
}
} else {
// try to offer it for a limited time then retry with the direct emission
if (queue.offer(t1, MAX_TRIES)) {
continue;
}
for (;;) {
int state = wip.get();
// if we are in working or added state, switch to added state
if ((state == STATE_WORKING || state == STATE_ADDED) && wip.compareAndSet(state, STATE_ADDED)) {
return;
}
// otherwise, try becoming the emitter
if (wip.compareAndSet(STATE_IDLE, STATE_WORKING)) {
for (;;) {
// process work as much as possible
T v;
while ((v = queue.poll()) != null) {
consumer.call(v);
}
// try becoming idle
int state2 = wip.get();
if (state2 == STATE_WORKING && wip.compareAndSet(STATE_WORKING, STATE_IDLE)) {
return;
}
// if we get here, somebody else added more work
wip.lazySet(STATE_WORKING);
}
}
// we didn't become the emitter, retry to see where whe are
}
}
}
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
static final class MPSCQueueStrideFastPath<T> implements Action1<T> {
final Action1<? super T> consumer;
final AtomicInteger wip = new AtomicInteger();
final MpscLinkedQueue[] queue;
static final int NCPU = Runtime.getRuntime().availableProcessors();
final int mask;
public MPSCQueueStrideFastPath(Action1<? super T> consumer) {
this.consumer = consumer;
this.mask = nextPowerOf2(NCPU) - 1;
this.queue = new MpscLinkedQueue[mask + 1];
for (int j = 0; j < queue.length; j++) {
queue[j] = new MpscLinkedQueue();
}
}
@Override
public void call(T t1) {
int max;
if (wip.compareAndSet(0, 1)) {
consumer.call(t1);
max = wip.decrementAndGet();
if (max == 0) {
return;
}
} else {
final long id = Thread.currentThread().getId();
final MpscLinkedQueue q = queue[(int)id & mask];
q.offer(t1);
max = wip.incrementAndGet();
if (max != 1) {
return;
}
}
do {
int c = 0;
for (MpscLinkedQueue q0 : queue) {
T v = (T)q0.poll();
if (v != null) {
consumer.call(v);
if (++c == max) {
break;
}
}
}
max = wip.addAndGet(-c);
} while (max > 0);
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
static final class MPSCQueueStride<T> implements Action1<T> {
final Action1<? super T> consumer;
final AtomicInteger wip = new AtomicInteger();
final MpscLinkedQueue[] queue;
static final int NCPU = Runtime.getRuntime().availableProcessors();
final int mask;
public MPSCQueueStride(Action1<? super T> consumer) {
this.consumer = consumer;
this.mask = nextPowerOf2(NCPU) - 1;
this.queue = new MpscLinkedQueue[mask + 1];
for (int j = 0; j < queue.length; j++) {
queue[j] = new MpscLinkedQueue();
}
}
@Override
public void call(T t1) {
long id = Thread.currentThread().getId();
final MpscLinkedQueue[] qs = queue;
final MpscLinkedQueue q = qs[(int)id & mask];
final AtomicInteger w = wip;
final Action1<? super T> cons = consumer;
q.offer(t1);
int max = w.incrementAndGet();
if (max == 1) {
do {
int c = 0;
for (MpscLinkedQueue q0 : qs) {
T v = (T)q0.poll();
if (v != null) {
cons.call(v);
if (++c >= max) {
break;
}
}
}
max = w.addAndGet(-c);
} while (max > 0);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment