Skip to content

Instantly share code, notes, and snippets.

@dkomanov
Created April 30, 2019 12:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dkomanov/f95baefc2a591a34e0549b0f8a3af408 to your computer and use it in GitHub Desktop.
Save dkomanov/f95baefc2a591a34e0549b0f8a3af408 to your computer and use it in GitHub Desktop.
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(1)
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Warmup(iterations = 2, time = 5, timeUnit = TimeUnit.SECONDS)
public class BlockingQueueBenchmark {
private static final String END_MARKER = "";
private static final ExecutorService threadPool = Executors.newFixedThreadPool(8, new ThreadFactory() {
AtomicInteger count = new AtomicInteger();
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "thread-pool-" + count.incrementAndGet());
t.setDaemon(true);
return t;
}
});
@Param({"1", "2", "4", "8"})
private int producerThreadCount = 0;
@Param({"10", "100", "1000", "10000"})
private int capacity = 0;
@Benchmark
public void take() throws Throwable {
CountDownLatch barrier = new CountDownLatch(1);
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(capacity);
startProducers(barrier, blockingQueue);
barrier.countDown();
int stopCount = 0;
while (stopCount != producerThreadCount) {
String v = blockingQueue.take();
Blackhole.consumeCPU(10);
if (v == END_MARKER) {
++stopCount;
}
}
}
@Benchmark
public void drain() throws Throwable {
CountDownLatch barrier = new CountDownLatch(1);
ArrayList<String> list = new ArrayList<>(); // Optionally we can set capacity to avoid allocations
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(capacity);
startProducers(barrier, blockingQueue);
barrier.countDown();
int stopCount = 0;
while (stopCount != producerThreadCount) {
blockingQueue.drainTo(list);
if (list.isEmpty()) {
String v = blockingQueue.take();
Blackhole.consumeCPU(10);
if (v == END_MARKER) {
++stopCount;
}
} else {
stopCount += list.stream().filter(v -> {
Blackhole.consumeCPU(10);
return v == END_MARKER;
}).count();
list.clear();
}
}
}
private void startProducers(CountDownLatch barrier, ArrayBlockingQueue<String> blockingQueue) {
for (int i = 0; i < producerThreadCount; ++i) {
final String v = Integer.toString(i);
threadPool.submit(() -> {
try {
barrier.await();
for (int j = 0; j < 100000; ++j) {
blockingQueue.put(v);
}
blockingQueue.put(END_MARKER);
}
catch (InterruptedException ignored) {
blockingQueue.offer(END_MARKER);
Thread.interrupted();
}
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment