Skip to content

Instantly share code, notes, and snippets.

@bassemZohdy
Created September 8, 2015 13:58
Show Gist options
  • Save bassemZohdy/f100789df84f7fa7a77c to your computer and use it in GitHub Desktop.
Save bassemZohdy/f100789df84f7fa7a77c to your computer and use it in GitHub Desktop.
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.junit.Test;
public class ContConsumingTest {
@Test
public void test() throws InterruptedException {
int count = 1000;
AtomicInteger i = new AtomicInteger(1);
Supplier<Integer> supplier = () -> {
System.out.println(Thread.currentThread().getName()
+ " >> supplier -> " + i);
if (i.get() % 50 == 0) {
i.incrementAndGet();
throw new RuntimeException();
}
if (i.get() < count)
return i.getAndIncrement();
else
return null;
};
AtomicInteger j = new AtomicInteger(1);
Consumer<Integer> consumer = (n) -> {
System.out.println(Thread.currentThread().getName()
+ " >> consumer <- " + n);
if (j.getAndIncrement() % 50 == 0)
throw new RuntimeException();
};
QueuedProcessor<Integer> p = new QueuedProcessor<Integer>(supplier,
consumer, Executors.newCachedThreadPool());
p.process();
}
}
import java.io.Serializable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class QueuedProcessor<T> {
final private Supplier<T> supplier;
final private Consumer<T> consumer;
final private BlockingQueue<Event<T>> queue;
final private AtomicInteger triesSupplier;
final private long sleepSupplier;
final private AtomicInteger triesConsumer;
final private long sleepConsumer;
final private ExecutorService executorService;
private volatile boolean stop;
private CountDownLatch endLatch;
public QueuedProcessor(Supplier<T> supplier, Consumer<T> consumer,
int triesSupplier, int triesConsumer, long sleepSupplier,
long sleepConsumer, ExecutorService executorService) {
this.supplier = supplier;
this.consumer = consumer;
this.triesSupplier = new AtomicInteger(triesSupplier);
this.triesConsumer = new AtomicInteger(triesConsumer);
this.sleepSupplier = sleepSupplier;
this.sleepConsumer = sleepConsumer;
this.queue = new LinkedBlockingQueue<Event<T>>();
this.endLatch = new CountDownLatch(1);
this.executorService = executorService;
}
public QueuedProcessor(Supplier<T> supplier, Consumer<T> consumer,
ExecutorService executorService) {
this(supplier, consumer, 5, 5, 1000, 1000, executorService);
}
public void processAsync() {
CompletableFuture<?> futureEnqueue = enqueue();
CompletableFuture<?> futureDequeue = dequeue();
}
public void process() {
processAsync();
try {
endLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private CompletableFuture<?> enqueue() {
return CompletableFuture.runAsync(() -> {
try {
T t = null;
while (!stop && (t = supplier.get()) != null) {
queue.put(Event.of(t));
}
sendKillEvent();
stop = true;
} catch (Throwable th) {
throw new RuntimeException(th);
}
}, executorService).handleAsync((v, th) -> {
if (th != null) {
if (triesSupplier.getAndDecrement() > 0) {
try {
Thread.sleep(sleepSupplier);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return this.enqueue();
} else {
sendKillEvent();
return v;
}
} else {
return v;
}
});
}
private CompletableFuture<?> dequeue() {
return CompletableFuture.runAsync(() -> {
try {
Event<T> e;
while (!(e = queue.take()).equals(Event.KILL_EVENT)) {
consumer.accept(e.get());
}
} catch (Throwable th) {
throw new RuntimeException(th);
}
}, executorService).handleAsync((v, th) -> {
if (th != null)
if (triesConsumer.getAndDecrement() > 0) {
try {
Thread.sleep(sleepConsumer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return this.dequeue();
} else {
endLatch.countDown();
return v;
}
else {
endLatch.countDown();
return v;
}
});
}
private void sendKillEvent() {
try {
this.queue.put(Event.KILL_EVENT);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void await() {
try {
this.endLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private final static class Event<T> implements Serializable {
private static final long serialVersionUID = -4832551625770149470L;
@SuppressWarnings("rawtypes")
public static final Event KILL_EVENT = new Event();
private T t;
private Event() {
}
private Event(T t) {
this.t = t;
}
public static <T> Event<T> of(T t) {
return new Event<T>(t);
}
public T get() {
return t;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment