Skip to content

Instantly share code, notes, and snippets.

@bassemZohdy
Last active August 29, 2015 14:18
Show Gist options
  • Save bassemZohdy/16209bd4a388083f1e8e to your computer and use it in GitHub Desktop.
Save bassemZohdy/16209bd4a388083f1e8e to your computer and use it in GitHub Desktop.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
public class QueuedConsumer<T> implements Consumer<T> {
private BlockingQueue<T> queue = new LinkedBlockingQueue<>();
private AtomicBoolean stop = new AtomicBoolean(false);
private final Consumer<T> consumer;
private final Predicate<T> whenToStop;
private final Executor queueExecutor;
private QueuedConsumer(Consumer<T> consumer, Predicate<T> whenToStop,
Executor executor) {
this.consumer = consumer;
if (whenToStop != null)
this.whenToStop = whenToStop;
else
this.whenToStop = (t) -> false;
if (executor != null)
queueExecutor = executor;
else
queueExecutor = Executors.newSingleThreadExecutor();
queueExecutor.execute(() -> run());
}
private QueuedConsumer(Consumer<T> consumer, Predicate<T> whenToStop) {
this(consumer, whenToStop, null);
}
private QueuedConsumer(Consumer<T> consumer) {
this(consumer, null, null);
}
@Override
public void accept(T t) {
try {
queue.put(t);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
public static <T> QueuedConsumer<T> of(Consumer<T> consumer,
Predicate<T> whenToStop, Executor executor) {
return new QueuedConsumer<>(consumer, whenToStop, executor);
}
public static <T> QueuedConsumer<T> of(Consumer<T> consumer,
Predicate<T> whenToStop) {
return new QueuedConsumer<>(consumer, whenToStop);
}
public static <T> QueuedConsumer<T> of(Consumer<T> consumer) {
return new QueuedConsumer<>(consumer);
}
public static <T> QueuedConsumer<T> of(Consumer<T> consumer,
ExecutorService executor) {
return new QueuedConsumer<>(consumer, null, executor);
}
private void run() {
try {
while (true) {
T t = queue.take();
if (stop.get())
break;
this.consumer.accept(t);
if (whenToStop.test(t))
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
public void stop() {
this.stop.set(true);
}
}
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class QueuedConsumerTest {
public static void main(String[] args) {
QueuedConsumer<Integer> consumer = QueuedConsumer
.of(System.out::println);
Executors.newSingleThreadExecutor().execute(() -> {
try {
Thread.sleep(100);
} catch (Exception ex) {
}
//change between stop/finish and see difference
//consumer.stop();
consumer.finish();
});
Stream<Integer> stream = IntStream.range(0, 100000).boxed();
stream.forEach(s -> {
consumer.accept(s);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment