Skip to content

Instantly share code, notes, and snippets.

@datenbrille
Last active June 20, 2017 05:17
Show Gist options
  • Save datenbrille/38038b39d2c1c70082f408cd3d4d534c to your computer and use it in GitHub Desktop.
Save datenbrille/38038b39d2c1c70082f408cd3d4d534c to your computer and use it in GitHub Desktop.
Simple Bocking Queue with java 8 native code
package io.couplespent;
import lombok.Data;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ApplicationTests {
private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationTests.class);
public static final int BATCH_SIZE = 18;
public static final int CAPACITY = 2;
public static final int MAX_SIZE = 123;
@Test
public void contextLoads() throws ExecutionException, InterruptedException {
Random random = new Random();
Map<Function<NumberHolder, Boolean>, BlockingQueue<NumberHolder>> buffers = new HashMap<>();
buffers.put((item) -> true, new ArrayBlockingQueue<>(CAPACITY));
buffers.put((item) -> true, new ArrayBlockingQueue<>(CAPACITY));
buffers.put((item) -> true, new ArrayBlockingQueue<>(CAPACITY));
Stream<NumberHolder> numbers = Stream.generate(() -> new NumberHolder(random.nextInt(10))).limit(MAX_SIZE);
List<Producer> producers = buffers.entrySet()
.stream()
.map((entry) -> new Producer(entry.getValue(), entry.getKey())).collect(Collectors.toList());
List<Sender> senders = buffers.entrySet()
.stream()
.map((entry) -> new Sender(entry.getValue(), BATCH_SIZE, this::sendBatchToGame))
.collect(Collectors.toList());
senders.forEach((sender) -> new Thread(sender, "sender").start());
CompletableFuture.runAsync(() -> numbers.forEach(item -> producers.forEach((producer -> producer.write(item)))))
.thenAccept((Void) -> senders.forEach((Sender::stop))).get();
}
private void sendBatchToGame(List<NumberHolder> numbers) {
LOGGER.info("\t\t=> Sending to game");
LOGGER.info("\t\t\t=> Sending " + numbers.size() + " items to game, sleeping");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOGGER.info("\t\t=> Finished sending to game");
}
private class Sender implements Runnable {
private final BlockingQueue<NumberHolder> source;
private final int batchSize;
private boolean shutdown = false;
private final Consumer<List<NumberHolder>> consumer;
private List<NumberHolder> localBuffer = new LinkedList<>();
private Sender(BlockingQueue<NumberHolder> source, int batchSize, Consumer<List<NumberHolder>> consumer) {
this.source = source;
this.batchSize = batchSize;
this.consumer = consumer;
}
@Override
public void run() {
while (!shutdown) {
try {
localBuffer.add(source.take());
if (localBuffer.size() == batchSize) {
LOGGER.info("\t=> Start batch processing");
consumer.accept(localBuffer);
localBuffer = new LinkedList<>();
LOGGER.info("\t=> Batch processing ended");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop() {
this.shutdown = true;
consumer.accept(localBuffer);
if (source.size() > 0) {
List<NumberHolder> temp = new LinkedList<>();
int drainTo = source.drainTo(temp);
LOGGER.info("=> Remaining elements " + drainTo);
consumer.accept(temp);
}
}
}
private class Producer {
private final BlockingQueue<NumberHolder> sink;
private int counter = 0;
private Function<NumberHolder, Boolean> partioner;
public Producer(BlockingQueue<NumberHolder> sink, Function<NumberHolder, Boolean> partioner) {
this.sink = sink;
this.partioner = partioner;
}
public void write(NumberHolder numberHolder) {
counter++;
if (counter % 10 == 0) {
LOGGER.info("=> Produced 10 elements");
}
if (partioner.apply(numberHolder)) {
LOGGER.info("=> Current queue: " + sink.size());
try {
boolean success = sink.offer(numberHolder, 2, TimeUnit.SECONDS);
if (!success) {
throw new RuntimeException();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Data
private class NumberHolder {
private final int number;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment