Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@calvernaz
Last active June 13, 2019 13:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calvernaz/8333b0db263eeeb6a6ac466107799638 to your computer and use it in GitHub Desktop.
Save calvernaz/8333b0db263eeeb6a6ac466107799638 to your computer and use it in GitHub Desktop.
package org.weirdloop;
import org.jctools.queues.atomic.SpscAtomicArrayQueue;
import java.util.Collection;
import java.util.Timer;
import java.util.TimerTask;
import java.util.function.Consumer;
class BatchQueue<T> {
private final SpscAtomicArrayQueue<T> queue;
private final Collection<T> consumer;
private Timer timer = null;
static <T> BatchQueue<T> create(final int capacity) {
return new BatchQueue<>(new SpscAtomicArrayQueue<>(capacity), null, null);
}
static <T> BatchQueue<T> create(final int capacity, final Collection<T> container) {
return new BatchQueue<>(new SpscAtomicArrayQueue<>(capacity), container, null);
}
static <T, K extends TimerTask & Consumer> BatchQueue<T> create(final int capacity, final Collection<T> container, final K timerTask) {
return new BatchQueue<>(new SpscAtomicArrayQueue<>(capacity), container, timerTask);
}
private <K extends TimerTask & Consumer> BatchQueue(final SpscAtomicArrayQueue<T> queue, final Collection<T> container, K timerTask) {
this.queue = queue;
this.consumer = container;
if (timerTask != null) {
timer = new Timer("queue-time-drainer");
timerTask.accept(queue);
timer.schedule(timerTask, 1000);
}
}
T dequeue() {
return this.queue.poll();
}
void enqueue(T value) {
boolean offer = this.queue.offer(value);
if (!offer) {
drainQueue();
enqueue(value);
}
}
private void drainQueue() {
if (consumer == null)
queue.drain(System.out::println);
else
drainTo(consumer);
}
private void drainTo(Collection<T> container) {
queue.drain(container::add);
}
}
package org.weirdloop;
import org.junit.Test;
import java.util.Collection;
import java.util.HashSet;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
public class BatchQueueTest {
@Test
public void enqueue() {
BatchQueue<Integer> queue = BatchQueue.create(4);
queue.enqueue(1);
queue.enqueue(2);
queue.enqueue(3);
queue.enqueue(4);
queue.enqueue(5);
assertThat(queue.dequeue(), is(5));
assertThat(queue.dequeue(), nullValue());
}
@Test
public void enqueueWithDrainer() {
Collection<Integer> container = new HashSet<>();
BatchQueue<Integer> queue = BatchQueue.create(4, container);
queue.enqueue(1);
queue.enqueue(2);
queue.enqueue(3);
queue.enqueue(4);
queue.enqueue(5);
assertThat(queue.dequeue(), is(5));
assertThat(queue.dequeue(), nullValue());
assertThat(container, hasItems(1, 2, 3, 4));
}
@Test
public void queueWithTimeDrainer() {
QueueDrainer<Integer> drainer = new QueueDrainer<>(new HashSet<>());
BatchQueue<Integer> queue = BatchQueue.create(4, drainer);
queue.enqueue(1);
queue.enqueue(2);
queue.enqueue(3);
queue.enqueue(4);
queue.enqueue(5);
assertThat(queue.dequeue(), is(5));
assertThat(queue.dequeue(), nullValue());
assertThat(drainer, hasItems(1, 2, 3, 4));
}
}
package org.weirdloop;
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
public class QueueDrainer<T> extends AbstractCollection<T> {
private final Collection<T> container;
public QueueDrainer(Collection<T> container) {
this.container = container;
}
@Override
public boolean add(T t) {
return container.add(t);
}
@Override
public Iterator<T> iterator() {
return container.iterator();
}
@Override
public int size() {
return container.size();
}
}
package org.weirdloop;
import org.jctools.queues.atomic.SpscAtomicArrayQueue;
import java.util.TimerTask;
import java.util.function.Consumer;
public class TimeQueueDrainer<T> extends TimerTask implements Consumer<SpscAtomicArrayQueue<T>> {
private SpscAtomicArrayQueue<T> queue;
@Override
public void run() {
queue.drain(System.out::println);
}
@Override
public void accept(SpscAtomicArrayQueue<T> queue) {
this.queue = queue;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment