Skip to content

Instantly share code, notes, and snippets.

@sshark
Last active February 25, 2024 09:03
Show Gist options
  • Save sshark/86948a07830a2452f58544e98afc17d4 to your computer and use it in GitHub Desktop.
Save sshark/86948a07830a2452f58544e98afc17d4 to your computer and use it in GitHub Desktop.
Uses flags or poison pill to synchronize producers and consumers
package org.teckhooi;
import java.time.Duration;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerProducerQ {
final private int capacity;
final private Object lock = new Object();
volatile private boolean alive = true;
volatile private boolean producerAlive = true;
final private Queue<String> bin = new LinkedList<>();
final private Queue<String> queue = new LinkedList<>();
public Queue<String> dump() {
return bin;
}
public boolean isEmpty() {
return queue.isEmpty();
}
public boolean isAlive() {
return alive;
}
public void kill() {
this.alive = false;
}
public boolean isProducerAlive() {
return producerAlive;
}
public void killProducer() {
this.producerAlive = false;
}
public ConsumerProducerQ(int capacity) {
this.capacity = capacity;
}
public ConsumerProducerQ() {
this(Integer.MAX_VALUE);
}
public void consume() throws Exception {
synchronized (lock) {
if (queue.isEmpty()) {
lock.wait(200);
if (!isProducerAlive()) {
return;
}
}
String data = queue.remove();
lock.notifyAll();
bin.add(data);
// delay.ifPresent(d -> throwRuntimeExp(() -> Class.forName("abc")));
}
}
private void throwRuntimeExp(CheckedFunction0 r) {
try {
r.apply();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void produce() throws Exception {
synchronized (lock) {
if (queue.size() >= capacity) {
lock.wait();
}
String data = UUID.randomUUID().toString().substring(0, 5);
queue.offer(data);
lock.notifyAll();
bin.add(data);
}
}
public static void main(String[] args) throws Exception {
final ConsumerProducerQ cpQ = new ConsumerProducerQ(5);
final CountDownLatch latch = new CountDownLatch(1);
final Runnable producer = () -> {
while (cpQ.isAlive()) {
try {
cpQ.produce();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
cpQ.killProducer();
};
final Runnable consumer = () -> {
while (cpQ.isProducerAlive() || !cpQ.isEmpty()) {
try {
cpQ.consume();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
latch.countDown();
};
try (ExecutorService es = Executors.newCachedThreadPool()) {
es.submit(consumer);
es.submit(producer);
Thread.sleep(1000);
cpQ.kill();
latch.await();
}
cpQ.dump().forEach(System.out::println);
}
}
package org.teckhooi;
import java.io.FileWriter;
import java.io.PrintWriter;
import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class PoisonPillQ {
final private int capacity;
final private Object lock = new Object();
volatile private boolean alive = true;
final private Queue<String> bin = new LinkedList<>();
final private Queue<Optional<String>> queue = new LinkedList<>();
public Queue<String> dump() {
return bin;
}
public boolean isEmpty() {
return queue.isEmpty();
}
public boolean isAlive() {
return alive;
}
public void kill() {
this.alive = false;
}
public void killProducer() {
try {
produce(Optional.empty()); // poison pill
} catch (Exception e) {
e.printStackTrace();
}
}
public ConsumerProducerQ(int capacity) {
this.capacity = capacity;
}
public ConsumerProducerQ() {
this(Integer.MAX_VALUE);
}
public boolean consume() throws Exception {
synchronized (lock) {
while (queue.isEmpty()) {
lock.wait();
}
Optional<String> data = queue.remove();
data.ifPresent(d -> {
bin.add(d);
});
lock.notifyAll();
return data.isEmpty();
// delay.ifPresent(d -> throwRuntimeExp(() -> Class.forName("abc")));
}
}
private void throwRuntimeExp(CheckedFunction0 r) {
try {
r.apply();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void produce() throws Exception {
produce(Optional.of(UUID.randomUUID().toString().substring(0, 8)));
}
public void produce(Optional<String> data) throws Exception {
synchronized (lock) {
if (queue.size() >= capacity) {
lock.wait();
}
queue.offer(data);
lock.notifyAll();
data.ifPresent(bin::add);
}
}
public static void main(String[] args) throws Exception {
final ConsumerProducerQ cpQ = new ConsumerProducerQ(5);
final CountDownLatch latch = new CountDownLatch(1);
final Runnable producer = () -> {
while (cpQ.isAlive()) {
try {
cpQ.produce();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
cpQ.killProducer();
};
final Runnable consumer = () -> {
while (true) {
try {
if(cpQ.consume()) {
break;
};
} catch (Exception e) {
throw new RuntimeException(e);
}
}
latch.countDown();
};
try (ExecutorService es = Executors.newCachedThreadPool()) {
es.submit(consumer);
es.submit(producer);
Thread.sleep(1000);
cpQ.kill();
latch.await();
}
var sortedResult = cpQ.dump().stream().sorted().toList().toArray(new String[0]);
if (sortedResult.length % 2 != 0) {
throw new RuntimeException("Result count should be even");
}
/*
try (PrintWriter writer = new PrintWriter(new FileWriter("results.txt"))){
// cpQ.dump().forEach(System.out::println);
cpQ.dump().forEach(writer::println);
}
*/
for (int i = 0; i < sortedResult.length; i = i + 2) {
if (!sortedResult[i].equals(sortedResult[i + 1])) {
throw new RuntimeException(String.format("The results does not tally, %s, %s", sortedResult[i], sortedResult[i + 1]));
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment