Skip to content

Instantly share code, notes, and snippets.

@bassemZohdy
Created September 7, 2015 20:10
Show Gist options
  • Save bassemZohdy/5f1aef3a4da1e1683047 to your computer and use it in GitHub Desktop.
Save bassemZohdy/5f1aef3a4da1e1683047 to your computer and use it in GitHub Desktop.
import java.io.Serializable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class Processor<T> {
final private Supplier<T> supplier;
final private Consumer<T> consumer;
final private BlockingQueue<Event<T>> queue;
private volatile boolean stop;
private CountDownLatch endLatch;
private enum Type {
sync, async, queue
};
private Type type = Type.queue;
public Processor(Supplier<T> supplier, Consumer<T> consumer) {
this.supplier = supplier;
this.consumer = consumer;
this.queue = new LinkedBlockingQueue<Event<T>>();
this.endLatch = new CountDownLatch(1);
}
public void process() {
switch (type) {
case sync:
sync();
break;
case async:
async();
break;
case queue:
CompletableFuture<?> futureEnqueue = enqueue();
CompletableFuture<?> futureDequeue = dequeue();
try {
endLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
break;
}
}
private void async() {
while (!stop)
CompletableFuture.supplyAsync(supplier).thenAcceptAsync(consumer);
}
private void sync() {
while (!stop)
consumer.accept(supplier.get());
}
private CompletableFuture<?> enqueue() {
return CompletableFuture.runAsync(() -> {
try {
T t = null;
while (!stop && (t = supplier.get()) != null) {
queue.put(Event.of(t));
}
queue.put(Event.KILL_EVENT);
stop = true;
} catch (Throwable th) {
throw new RuntimeException(th);
}
}).handleAsync((v, th) -> {
if (th != null)
return this.enqueue();
else
return v;
});
}
private CompletableFuture<?> dequeue() {
return CompletableFuture.runAsync(() -> {
try {
Event<T> e;
while (!(e = queue.take()).equals(Event.KILL_EVENT)) {
consumer.accept(e.get());
}
} catch (Throwable th) {
throw new RuntimeException(th);
}
}).handleAsync((v, th) -> {
if (th != null)
return this.dequeue();
else {
endLatch.countDown();
return v;
}
});
}
private final static class Event<T> implements Serializable {
private static final long serialVersionUID = -4832551625770149470L;
@SuppressWarnings("rawtypes")
public static final Event KILL_EVENT = new Event();
private T t;
private Event() {
}
private Event(T t) {
this.t = t;
}
public static <T> Event<T> of(T t) {
return new Event<T>(t);
}
public T get() {
return t;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment