Skip to content

Instantly share code, notes, and snippets.

@seb-martin
Last active March 16, 2018 23:44
Show Gist options
  • Save seb-martin/437967de8968f58687daf61169ee17a2 to your computer and use it in GitHub Desktop.
Save seb-martin/437967de8968f58687daf61169ee17a2 to your computer and use it in GitHub Desktop.
Producer Consumer multithread
import java.util.concurrent.*;
import java.util.stream.IntStream;
public class ProducerConsumerPoc {
public static void main(String[] args) {
BlockingQueue<String> shared = new LinkedBlockingQueue<>(50);
int productCount = 100;
Thread producer = new MultiThreadProducer(shared, productCount);
Thread consumer = new MultiThreadConsumer(shared);
producer.start();
consumer.start();
}
}
class MultiThreadProducer extends Thread {
private ExecutorService pool;
private BlockingQueue<String> queue;
private int productCount;
MultiThreadProducer(BlockingQueue<String> queue, int productCount) {
this.pool = Executors.newFixedThreadPool(2);
this.queue = queue;
this.productCount = productCount;
}
@Override
public void run() {
String producerName = "Producer:";
trace(producerName);
IntStream.rangeClosed(1, this.productCount)
.forEach(productNumber -> this.pool.execute(new Production(producerName, productNumber, queue)));
this.pool.shutdown();
}
private void trace(String producerName) {
System.out.println(producerName + Thread.currentThread() + " start produce");
}
}
class Production implements Runnable {
private String producerName;
private int productNumber;
private BlockingQueue<String> queue;
Production(String producerName, int productNumber, BlockingQueue<String> queue) {
this.producerName = producerName;
this.productNumber = productNumber;
this.queue = queue;
}
@Override
public void run() {
try {
String product = "Product " + productNumber;
trace(product);
this.queue.put(product);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void trace(String product) {
String trace = producerName + Thread.currentThread() + " produce " + product;
System.out.println(trace);
}
}
class MultiThreadConsumer extends Thread {
private BlockingQueue<String> queue;
private ExecutorService pool;
MultiThreadConsumer(BlockingQueue<String> queue) {
this.pool = Executors.newFixedThreadPool(20);
this.queue = queue;
}
@Override
public void run() {
String consumerName = "Consumer:";
trace(consumerName);
boolean notInterrupted = true;
while (notInterrupted) {
try {
String product = this.queue.take();
this.pool.execute(new Consumption(consumerName, product));
} catch (InterruptedException e) {
notInterrupted = false;
e.printStackTrace();
}
}
}
private void trace(String consumerName) {
System.out.println(consumerName + Thread.currentThread() + " start consume");
}
}
class Consumption implements Runnable {
private String consumerName;
private String product;
Consumption(String consumerName, String product) {
this.consumerName = consumerName;
this.product = product;
}
@Override
public void run() {
String trace = consumerName + Thread.currentThread() + " consume " + product;
System.out.println(trace);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment