Last active
June 13, 2019 13:06
-
-
Save calvernaz/8333b0db263eeeb6a6ac466107799638 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.weirdloop; | |
import org.jctools.queues.atomic.SpscAtomicArrayQueue; | |
import java.util.Collection; | |
import java.util.Timer; | |
import java.util.TimerTask; | |
import java.util.function.Consumer; | |
class BatchQueue<T> { | |
private final SpscAtomicArrayQueue<T> queue; | |
private final Collection<T> consumer; | |
private Timer timer = null; | |
static <T> BatchQueue<T> create(final int capacity) { | |
return new BatchQueue<>(new SpscAtomicArrayQueue<>(capacity), null, null); | |
} | |
static <T> BatchQueue<T> create(final int capacity, final Collection<T> container) { | |
return new BatchQueue<>(new SpscAtomicArrayQueue<>(capacity), container, null); | |
} | |
static <T, K extends TimerTask & Consumer> BatchQueue<T> create(final int capacity, final Collection<T> container, final K timerTask) { | |
return new BatchQueue<>(new SpscAtomicArrayQueue<>(capacity), container, timerTask); | |
} | |
private <K extends TimerTask & Consumer> BatchQueue(final SpscAtomicArrayQueue<T> queue, final Collection<T> container, K timerTask) { | |
this.queue = queue; | |
this.consumer = container; | |
if (timerTask != null) { | |
timer = new Timer("queue-time-drainer"); | |
timerTask.accept(queue); | |
timer.schedule(timerTask, 1000); | |
} | |
} | |
T dequeue() { | |
return this.queue.poll(); | |
} | |
void enqueue(T value) { | |
boolean offer = this.queue.offer(value); | |
if (!offer) { | |
drainQueue(); | |
enqueue(value); | |
} | |
} | |
private void drainQueue() { | |
if (consumer == null) | |
queue.drain(System.out::println); | |
else | |
drainTo(consumer); | |
} | |
private void drainTo(Collection<T> container) { | |
queue.drain(container::add); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.weirdloop; | |
import org.junit.Test; | |
import java.util.Collection; | |
import java.util.HashSet; | |
import static org.hamcrest.CoreMatchers.hasItems; | |
import static org.hamcrest.CoreMatchers.is; | |
import static org.hamcrest.CoreMatchers.nullValue; | |
import static org.junit.Assert.assertThat; | |
public class BatchQueueTest { | |
@Test | |
public void enqueue() { | |
BatchQueue<Integer> queue = BatchQueue.create(4); | |
queue.enqueue(1); | |
queue.enqueue(2); | |
queue.enqueue(3); | |
queue.enqueue(4); | |
queue.enqueue(5); | |
assertThat(queue.dequeue(), is(5)); | |
assertThat(queue.dequeue(), nullValue()); | |
} | |
@Test | |
public void enqueueWithDrainer() { | |
Collection<Integer> container = new HashSet<>(); | |
BatchQueue<Integer> queue = BatchQueue.create(4, container); | |
queue.enqueue(1); | |
queue.enqueue(2); | |
queue.enqueue(3); | |
queue.enqueue(4); | |
queue.enqueue(5); | |
assertThat(queue.dequeue(), is(5)); | |
assertThat(queue.dequeue(), nullValue()); | |
assertThat(container, hasItems(1, 2, 3, 4)); | |
} | |
@Test | |
public void queueWithTimeDrainer() { | |
QueueDrainer<Integer> drainer = new QueueDrainer<>(new HashSet<>()); | |
BatchQueue<Integer> queue = BatchQueue.create(4, drainer); | |
queue.enqueue(1); | |
queue.enqueue(2); | |
queue.enqueue(3); | |
queue.enqueue(4); | |
queue.enqueue(5); | |
assertThat(queue.dequeue(), is(5)); | |
assertThat(queue.dequeue(), nullValue()); | |
assertThat(drainer, hasItems(1, 2, 3, 4)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.weirdloop; | |
import java.util.AbstractCollection; | |
import java.util.Collection; | |
import java.util.Iterator; | |
public class QueueDrainer<T> extends AbstractCollection<T> { | |
private final Collection<T> container; | |
public QueueDrainer(Collection<T> container) { | |
this.container = container; | |
} | |
@Override | |
public boolean add(T t) { | |
return container.add(t); | |
} | |
@Override | |
public Iterator<T> iterator() { | |
return container.iterator(); | |
} | |
@Override | |
public int size() { | |
return container.size(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.weirdloop; | |
import org.jctools.queues.atomic.SpscAtomicArrayQueue; | |
import java.util.TimerTask; | |
import java.util.function.Consumer; | |
public class TimeQueueDrainer<T> extends TimerTask implements Consumer<SpscAtomicArrayQueue<T>> { | |
private SpscAtomicArrayQueue<T> queue; | |
@Override | |
public void run() { | |
queue.drain(System.out::println); | |
} | |
@Override | |
public void accept(SpscAtomicArrayQueue<T> queue) { | |
this.queue = queue; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment