Last active
June 20, 2017 05:17
-
-
Save datenbrille/38038b39d2c1c70082f408cd3d4d534c to your computer and use it in GitHub Desktop.
Simple Bocking Queue with java 8 native code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.couplespent; | |
import lombok.Data; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.*; | |
import java.util.concurrent.*; | |
import java.util.function.Consumer; | |
import java.util.function.Function; | |
import java.util.stream.Collectors; | |
import java.util.stream.Stream; | |
public class ApplicationTests { | |
private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationTests.class); | |
public static final int BATCH_SIZE = 18; | |
public static final int CAPACITY = 2; | |
public static final int MAX_SIZE = 123; | |
@Test | |
public void contextLoads() throws ExecutionException, InterruptedException { | |
Random random = new Random(); | |
Map<Function<NumberHolder, Boolean>, BlockingQueue<NumberHolder>> buffers = new HashMap<>(); | |
buffers.put((item) -> true, new ArrayBlockingQueue<>(CAPACITY)); | |
buffers.put((item) -> true, new ArrayBlockingQueue<>(CAPACITY)); | |
buffers.put((item) -> true, new ArrayBlockingQueue<>(CAPACITY)); | |
Stream<NumberHolder> numbers = Stream.generate(() -> new NumberHolder(random.nextInt(10))).limit(MAX_SIZE); | |
List<Producer> producers = buffers.entrySet() | |
.stream() | |
.map((entry) -> new Producer(entry.getValue(), entry.getKey())).collect(Collectors.toList()); | |
List<Sender> senders = buffers.entrySet() | |
.stream() | |
.map((entry) -> new Sender(entry.getValue(), BATCH_SIZE, this::sendBatchToGame)) | |
.collect(Collectors.toList()); | |
senders.forEach((sender) -> new Thread(sender, "sender").start()); | |
CompletableFuture.runAsync(() -> numbers.forEach(item -> producers.forEach((producer -> producer.write(item))))) | |
.thenAccept((Void) -> senders.forEach((Sender::stop))).get(); | |
} | |
private void sendBatchToGame(List<NumberHolder> numbers) { | |
LOGGER.info("\t\t=> Sending to game"); | |
LOGGER.info("\t\t\t=> Sending " + numbers.size() + " items to game, sleeping"); | |
try { | |
Thread.sleep(1000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
LOGGER.info("\t\t=> Finished sending to game"); | |
} | |
private class Sender implements Runnable { | |
private final BlockingQueue<NumberHolder> source; | |
private final int batchSize; | |
private boolean shutdown = false; | |
private final Consumer<List<NumberHolder>> consumer; | |
private List<NumberHolder> localBuffer = new LinkedList<>(); | |
private Sender(BlockingQueue<NumberHolder> source, int batchSize, Consumer<List<NumberHolder>> consumer) { | |
this.source = source; | |
this.batchSize = batchSize; | |
this.consumer = consumer; | |
} | |
@Override | |
public void run() { | |
while (!shutdown) { | |
try { | |
localBuffer.add(source.take()); | |
if (localBuffer.size() == batchSize) { | |
LOGGER.info("\t=> Start batch processing"); | |
consumer.accept(localBuffer); | |
localBuffer = new LinkedList<>(); | |
LOGGER.info("\t=> Batch processing ended"); | |
} | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
public void stop() { | |
this.shutdown = true; | |
consumer.accept(localBuffer); | |
if (source.size() > 0) { | |
List<NumberHolder> temp = new LinkedList<>(); | |
int drainTo = source.drainTo(temp); | |
LOGGER.info("=> Remaining elements " + drainTo); | |
consumer.accept(temp); | |
} | |
} | |
} | |
private class Producer { | |
private final BlockingQueue<NumberHolder> sink; | |
private int counter = 0; | |
private Function<NumberHolder, Boolean> partioner; | |
public Producer(BlockingQueue<NumberHolder> sink, Function<NumberHolder, Boolean> partioner) { | |
this.sink = sink; | |
this.partioner = partioner; | |
} | |
public void write(NumberHolder numberHolder) { | |
counter++; | |
if (counter % 10 == 0) { | |
LOGGER.info("=> Produced 10 elements"); | |
} | |
if (partioner.apply(numberHolder)) { | |
LOGGER.info("=> Current queue: " + sink.size()); | |
try { | |
boolean success = sink.offer(numberHolder, 2, TimeUnit.SECONDS); | |
if (!success) { | |
throw new RuntimeException(); | |
} | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
@Data | |
private class NumberHolder { | |
private final int number; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment