Skip to content

Instantly share code, notes, and snippets.

@bassemZohdy
Created September 7, 2015 10:13
Show Gist options
  • Save bassemZohdy/30f26c83a6675f422a95 to your computer and use it in GitHub Desktop.
Save bassemZohdy/30f26c83a6675f422a95 to your computer and use it in GitHub Desktop.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
public class ContConsuming<T> {
final private Supplier<T> supplier;
final private Consumer<T> consumer;
final private Predicate<T> predicate;
final private Consumer<Throwable> exceptionConsumer;
final private ExecutorService executorService;
private CompletableFuture<Void> f;
private volatile boolean stop;
private ContConsuming(Supplier<T> supplier, Consumer<T> consumer,
Predicate<T> predicate, Consumer<Throwable> exceptionConsumer,
ExecutorService executorService, boolean autoStart) {
this.supplier = supplier;
this.consumer = consumer;
this.predicate = predicate;
this.exceptionConsumer = exceptionConsumer;
this.executorService = executorService;
if (autoStart) {
stop = false;
start();
}
}
private ContConsuming(Supplier<T> supplier, Consumer<T> consumer,
Predicate<T> predicate, Consumer<Throwable> exceptionConsumer,
ExecutorService executorService) {
this(supplier, consumer, predicate, exceptionConsumer, executorService,
false);
}
private ContConsuming(Supplier<T> supplier, Consumer<T> consumer,
Predicate<T> predicate, Consumer<Throwable> exceptionConsumer) {
this(supplier, consumer, predicate, exceptionConsumer, ForkJoinPool
.commonPool());
}
private ContConsuming(Supplier<T> supplier, Consumer<T> consumer,
Predicate<T> predicate) {
this(supplier, consumer, predicate, th -> th.printStackTrace());
}
private ContConsuming(Supplier<T> supplier, Consumer<T> consumer) {
this(supplier, consumer, t -> t == null);
}
private ContConsuming(Supplier<T> supplier, Predicate<T> predicate) {
this(supplier, null, predicate);
}
private ContConsuming(Supplier<T> supplier) {
this(supplier, null, t -> t == null);
}
public static <T> ContConsuming<T> of(Supplier<T> supplier,
Consumer<T> consumer, Predicate<T> predicate,
Consumer<Throwable> exceptionConsumer,
ExecutorService executorService, boolean autoStart) {
return new ContConsuming<T>(supplier, consumer, predicate,
exceptionConsumer, executorService, autoStart);
}
public static <T> ContConsuming<T> of(Supplier<T> supplier,
Consumer<T> consumer, Predicate<T> predicate,
Consumer<Throwable> exceptionConsumer,
ExecutorService executorService) {
return new ContConsuming<T>(supplier, consumer, predicate,
exceptionConsumer, executorService);
}
public static <T> ContConsuming<T> of(Supplier<T> supplier,
Consumer<T> consumer, Predicate<T> predicate,
Consumer<Throwable> exceptionConsumer) {
return new ContConsuming<T>(supplier, consumer, predicate,
exceptionConsumer);
}
public static <T> ContConsuming<T> of(Supplier<T> supplier,
Consumer<T> consumer, Predicate<T> predicate) {
return new ContConsuming<T>(supplier, consumer, predicate);
}
public static <T> ContConsuming<T> of(Supplier<T> supplier,
Consumer<T> consumer) {
return new ContConsuming<T>(supplier, consumer);
}
public static <T> ContConsuming<T> of(Supplier<T> supplier,
Predicate<T> predicate) {
return new ContConsuming<T>(supplier, predicate);
}
public static <T> ContConsuming<T> of(Supplier<T> supplier) {
return new ContConsuming<T>(supplier);
}
public void startAsync() {
stop = false;
f = CompletableFuture.runAsync(this::run, executorService).handle(
(v, th) -> {
if (th != null)
this.handelExp(th);
stop = true;
return v;
});
}
public void start() {
stop = false;
f = CompletableFuture.runAsync(this::run, executorService).handle(
(v, th) -> {
if (th != null)
this.handelExp(th);
stop = true;
return v;
});
f.join();
}
public void await(){
f.join();
}
public void stop() {
stop = true;
}
private void run() {
T t;
while (!stop && !predicate.test((t = supplier.get()))) {
if (consumer != null)
consumer.accept(t);
}
stop = true;
}
private void handelExp(Throwable th) {
if (exceptionConsumer != null)
exceptionConsumer.accept(th);
stop = true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment