Skip to content

Instantly share code, notes, and snippets.

@n1chre
Last active June 28, 2017 18:31
Show Gist options
  • Save n1chre/efc8da727a3ca623821ae4d470f82a7c to your computer and use it in GitHub Desktop.
Save n1chre/efc8da727a3ca623821ae4d470f82a7c to your computer and use it in GitHub Desktop.
package hrucc;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/**
* This class should be used when some object is producing stuff for some consumer and consume() is
* much slower than produce(). That's why elements are queued and processed when they can be processed.
*
* To use this class, simply wrap your Consumer into this class.
*
* !!! TODO this class will leave a thread waiting if null isn't consumed after consuming relevant objects.
* Or call finishConsuming() when done.
*
* Created by fhrenic on 28/06/2017.
*/
public class ProxyConsumer<T> implements Consumer<T> {
public static void main(String[] args) throws InterruptedException {
ProxyConsumer<Integer> proxy = new ProxyConsumer<>(number -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
System.out.println("can't sleep");
return;
}
System.out.println("got number " + number);
});
for (int i = 0; i < 10; i++) {
proxy.accept(i);
}
TimeUnit.SECONDS.sleep(20); // wait here
proxy.accept(420);
TimeUnit.SECONDS.sleep(5);
proxy.accept(null);
}
private final Consumer<T> consumer;
private final Queue<T> elements;
private AtomicBoolean noMoreElements;
public ProxyConsumer(Consumer<T> consumer) {
this.consumer = consumer;
elements = new LinkedList<>();
noMoreElements = new AtomicBoolean(false);
new Thread(this::startProcessing).start();
}
public synchronized void accept(T element) {
if (element != null) {
add(element);
} else {
noMoreElements.set(true);
}
notify();
}
public void finishConsuming() {
noMoreElements.set(true);
}
private synchronized void startProcessing() {
while (true) {
while (hasElements()) {
consumer.accept(take());
}
if (noMoreElements.get()) {
return; // notify but no elements added
}
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
}
}
// queue operations
private synchronized void add(T element) {
elements.add(element);
}
private synchronized boolean hasElements() {
return !elements.isEmpty();
}
private synchronized T take() {
return elements.poll();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment