Created
September 8, 2015 13:58
-
-
Save bassemZohdy/f100789df84f7fa7a77c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Consumer; | |
import java.util.function.Supplier; | |
import org.junit.Test; | |
public class ContConsumingTest { | |
@Test | |
public void test() throws InterruptedException { | |
int count = 1000; | |
AtomicInteger i = new AtomicInteger(1); | |
Supplier<Integer> supplier = () -> { | |
System.out.println(Thread.currentThread().getName() | |
+ " >> supplier -> " + i); | |
if (i.get() % 50 == 0) { | |
i.incrementAndGet(); | |
throw new RuntimeException(); | |
} | |
if (i.get() < count) | |
return i.getAndIncrement(); | |
else | |
return null; | |
}; | |
AtomicInteger j = new AtomicInteger(1); | |
Consumer<Integer> consumer = (n) -> { | |
System.out.println(Thread.currentThread().getName() | |
+ " >> consumer <- " + n); | |
if (j.getAndIncrement() % 50 == 0) | |
throw new RuntimeException(); | |
}; | |
QueuedProcessor<Integer> p = new QueuedProcessor<Integer>(supplier, | |
consumer, Executors.newCachedThreadPool()); | |
p.process(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.Serializable; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Consumer; | |
import java.util.function.Supplier; | |
public class QueuedProcessor<T> { | |
final private Supplier<T> supplier; | |
final private Consumer<T> consumer; | |
final private BlockingQueue<Event<T>> queue; | |
final private AtomicInteger triesSupplier; | |
final private long sleepSupplier; | |
final private AtomicInteger triesConsumer; | |
final private long sleepConsumer; | |
final private ExecutorService executorService; | |
private volatile boolean stop; | |
private CountDownLatch endLatch; | |
public QueuedProcessor(Supplier<T> supplier, Consumer<T> consumer, | |
int triesSupplier, int triesConsumer, long sleepSupplier, | |
long sleepConsumer, ExecutorService executorService) { | |
this.supplier = supplier; | |
this.consumer = consumer; | |
this.triesSupplier = new AtomicInteger(triesSupplier); | |
this.triesConsumer = new AtomicInteger(triesConsumer); | |
this.sleepSupplier = sleepSupplier; | |
this.sleepConsumer = sleepConsumer; | |
this.queue = new LinkedBlockingQueue<Event<T>>(); | |
this.endLatch = new CountDownLatch(1); | |
this.executorService = executorService; | |
} | |
public QueuedProcessor(Supplier<T> supplier, Consumer<T> consumer, | |
ExecutorService executorService) { | |
this(supplier, consumer, 5, 5, 1000, 1000, executorService); | |
} | |
public void processAsync() { | |
CompletableFuture<?> futureEnqueue = enqueue(); | |
CompletableFuture<?> futureDequeue = dequeue(); | |
} | |
public void process() { | |
processAsync(); | |
try { | |
endLatch.await(); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
private CompletableFuture<?> enqueue() { | |
return CompletableFuture.runAsync(() -> { | |
try { | |
T t = null; | |
while (!stop && (t = supplier.get()) != null) { | |
queue.put(Event.of(t)); | |
} | |
sendKillEvent(); | |
stop = true; | |
} catch (Throwable th) { | |
throw new RuntimeException(th); | |
} | |
}, executorService).handleAsync((v, th) -> { | |
if (th != null) { | |
if (triesSupplier.getAndDecrement() > 0) { | |
try { | |
Thread.sleep(sleepSupplier); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
return this.enqueue(); | |
} else { | |
sendKillEvent(); | |
return v; | |
} | |
} else { | |
return v; | |
} | |
}); | |
} | |
private CompletableFuture<?> dequeue() { | |
return CompletableFuture.runAsync(() -> { | |
try { | |
Event<T> e; | |
while (!(e = queue.take()).equals(Event.KILL_EVENT)) { | |
consumer.accept(e.get()); | |
} | |
} catch (Throwable th) { | |
throw new RuntimeException(th); | |
} | |
}, executorService).handleAsync((v, th) -> { | |
if (th != null) | |
if (triesConsumer.getAndDecrement() > 0) { | |
try { | |
Thread.sleep(sleepConsumer); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
return this.dequeue(); | |
} else { | |
endLatch.countDown(); | |
return v; | |
} | |
else { | |
endLatch.countDown(); | |
return v; | |
} | |
}); | |
} | |
private void sendKillEvent() { | |
try { | |
this.queue.put(Event.KILL_EVENT); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
public void await() { | |
try { | |
this.endLatch.await(); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
private final static class Event<T> implements Serializable { | |
private static final long serialVersionUID = -4832551625770149470L; | |
@SuppressWarnings("rawtypes") | |
public static final Event KILL_EVENT = new Event(); | |
private T t; | |
private Event() { | |
} | |
private Event(T t) { | |
this.t = t; | |
} | |
public static <T> Event<T> of(T t) { | |
return new Event<T>(t); | |
} | |
public T get() { | |
return t; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment