Last active
March 16, 2018 23:44
-
-
Save seb-martin/437967de8968f58687daf61169ee17a2 to your computer and use it in GitHub Desktop.
Producer Consumer multithread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.*; | |
import java.util.stream.IntStream; | |
public class ProducerConsumerPoc { | |
public static void main(String[] args) { | |
BlockingQueue<String> shared = new LinkedBlockingQueue<>(50); | |
int productCount = 100; | |
Thread producer = new MultiThreadProducer(shared, productCount); | |
Thread consumer = new MultiThreadConsumer(shared); | |
producer.start(); | |
consumer.start(); | |
} | |
} | |
class MultiThreadProducer extends Thread { | |
private ExecutorService pool; | |
private BlockingQueue<String> queue; | |
private int productCount; | |
MultiThreadProducer(BlockingQueue<String> queue, int productCount) { | |
this.pool = Executors.newFixedThreadPool(2); | |
this.queue = queue; | |
this.productCount = productCount; | |
} | |
@Override | |
public void run() { | |
String producerName = "Producer:"; | |
trace(producerName); | |
IntStream.rangeClosed(1, this.productCount) | |
.forEach(productNumber -> this.pool.execute(new Production(producerName, productNumber, queue))); | |
this.pool.shutdown(); | |
} | |
private void trace(String producerName) { | |
System.out.println(producerName + Thread.currentThread() + " start produce"); | |
} | |
} | |
class Production implements Runnable { | |
private String producerName; | |
private int productNumber; | |
private BlockingQueue<String> queue; | |
Production(String producerName, int productNumber, BlockingQueue<String> queue) { | |
this.producerName = producerName; | |
this.productNumber = productNumber; | |
this.queue = queue; | |
} | |
@Override | |
public void run() { | |
try { | |
String product = "Product " + productNumber; | |
trace(product); | |
this.queue.put(product); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
private void trace(String product) { | |
String trace = producerName + Thread.currentThread() + " produce " + product; | |
System.out.println(trace); | |
} | |
} | |
class MultiThreadConsumer extends Thread { | |
private BlockingQueue<String> queue; | |
private ExecutorService pool; | |
MultiThreadConsumer(BlockingQueue<String> queue) { | |
this.pool = Executors.newFixedThreadPool(20); | |
this.queue = queue; | |
} | |
@Override | |
public void run() { | |
String consumerName = "Consumer:"; | |
trace(consumerName); | |
boolean notInterrupted = true; | |
while (notInterrupted) { | |
try { | |
String product = this.queue.take(); | |
this.pool.execute(new Consumption(consumerName, product)); | |
} catch (InterruptedException e) { | |
notInterrupted = false; | |
e.printStackTrace(); | |
} | |
} | |
} | |
private void trace(String consumerName) { | |
System.out.println(consumerName + Thread.currentThread() + " start consume"); | |
} | |
} | |
class Consumption implements Runnable { | |
private String consumerName; | |
private String product; | |
Consumption(String consumerName, String product) { | |
this.consumerName = consumerName; | |
this.product = product; | |
} | |
@Override | |
public void run() { | |
String trace = consumerName + Thread.currentThread() + " consume " + product; | |
System.out.println(trace); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment