Skip to content

Instantly share code, notes, and snippets.

@digvijaybhakuni
Created January 18, 2015 20:35
Show Gist options
  • Save digvijaybhakuni/d5d28526c816e78afc91 to your computer and use it in GitHub Desktop.
Save digvijaybhakuni/d5d28526c816e78afc91 to your computer and use it in GitHub Desktop.
Example Of Producer Consumer Execution With Concurrency
package com.digvijayb.learning.concurrency;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class MyProducerConsumerExecution {
public MyProducerConsumerExecution() {
// TODO Auto-generated constructor stub
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
MyBroker<String> broker =new MyBroker<String>();
ExecutorService service = Executors.newFixedThreadPool(4);
service.submit(new MyConsumer("Mr One", broker));
service.submit(new MyConsumer("Mr Two", broker));
service.submit(new MyConsumer("Mr Three", broker));
service.submit(new MyProducer(broker));
//System.out.println(future.get());
service.shutdown();
}
public static class MyBroker<T> {
private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(20);
private AtomicBoolean productionFlag = new AtomicBoolean(Boolean.TRUE);
public boolean getProductionFlag() {
return productionFlag.get();
}
public void setProductionFlag(boolean value) {
this.productionFlag.getAndSet(value);
}
public boolean setValue(T value){
return queue.offer(value);
}
public T getValue() throws InterruptedException{
return queue.poll(1,TimeUnit.SECONDS);
}
}
public static class MyProducer implements Runnable{
private MyBroker<String> myBroker;
private static int i = 0;
public MyProducer(MyBroker<String> myBroker) {
this.myBroker = myBroker;
}
@Override
public void run() {
try {
someIntTask();
for(int i = 1; i <= 30; i++){
addToBroker(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
myBroker.setProductionFlag(Boolean.FALSE);
System.out.println("Producer finished its job; terminating.");
}
public void someIntTask() throws InterruptedException{
System.out.println("someIntTask(InProducer)..."+i++);
Thread.sleep(1000);
}
public void addToBroker(int in) throws InterruptedException{
System.out.println("addToBroker(InProducer)..."+i++);
do{
System.out.println("wait to add "+ in +" "+i++);
Thread.sleep(100);
}while(!myBroker.setValue(String.valueOf(in)));
}
}
public static class MyConsumer implements Runnable
{
private String name;
private MyBroker<String> myBroker;
private AtomicInteger i = new AtomicInteger(0);
public MyConsumer(String name, MyBroker<String> myBroker)
{
this.name = name;
this.myBroker = myBroker;
}
@Override
public void run()
{
System.out.println("Count task "+i.get()+" name "+name);
try
{
String data = myBroker.getValue();
while (myBroker.getProductionFlag() || data != null)
{
Thread.sleep(1000);
if(data!=null){
System.out.println("Consumer " + this.name + " processed data from broker: " + data);
i.getAndIncrement();
}
data = myBroker.getValue();
}
System.out.println("Comsumer " + this.name + " finished its job; terminating."+i.get());
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment