Created
January 18, 2015 20:35
-
-
Save digvijaybhakuni/d5d28526c816e78afc91 to your computer and use it in GitHub Desktop.
Example Of Producer Consumer Execution With Concurrency
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.digvijayb.learning.concurrency; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
public class MyProducerConsumerExecution { | |
public MyProducerConsumerExecution() { | |
// TODO Auto-generated constructor stub | |
} | |
public static void main(String[] args) throws InterruptedException, ExecutionException { | |
MyBroker<String> broker =new MyBroker<String>(); | |
ExecutorService service = Executors.newFixedThreadPool(4); | |
service.submit(new MyConsumer("Mr One", broker)); | |
service.submit(new MyConsumer("Mr Two", broker)); | |
service.submit(new MyConsumer("Mr Three", broker)); | |
service.submit(new MyProducer(broker)); | |
//System.out.println(future.get()); | |
service.shutdown(); | |
} | |
public static class MyBroker<T> { | |
private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(20); | |
private AtomicBoolean productionFlag = new AtomicBoolean(Boolean.TRUE); | |
public boolean getProductionFlag() { | |
return productionFlag.get(); | |
} | |
public void setProductionFlag(boolean value) { | |
this.productionFlag.getAndSet(value); | |
} | |
public boolean setValue(T value){ | |
return queue.offer(value); | |
} | |
public T getValue() throws InterruptedException{ | |
return queue.poll(1,TimeUnit.SECONDS); | |
} | |
} | |
public static class MyProducer implements Runnable{ | |
private MyBroker<String> myBroker; | |
private static int i = 0; | |
public MyProducer(MyBroker<String> myBroker) { | |
this.myBroker = myBroker; | |
} | |
@Override | |
public void run() { | |
try { | |
someIntTask(); | |
for(int i = 1; i <= 30; i++){ | |
addToBroker(i); | |
} | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
myBroker.setProductionFlag(Boolean.FALSE); | |
System.out.println("Producer finished its job; terminating."); | |
} | |
public void someIntTask() throws InterruptedException{ | |
System.out.println("someIntTask(InProducer)..."+i++); | |
Thread.sleep(1000); | |
} | |
public void addToBroker(int in) throws InterruptedException{ | |
System.out.println("addToBroker(InProducer)..."+i++); | |
do{ | |
System.out.println("wait to add "+ in +" "+i++); | |
Thread.sleep(100); | |
}while(!myBroker.setValue(String.valueOf(in))); | |
} | |
} | |
public static class MyConsumer implements Runnable | |
{ | |
private String name; | |
private MyBroker<String> myBroker; | |
private AtomicInteger i = new AtomicInteger(0); | |
public MyConsumer(String name, MyBroker<String> myBroker) | |
{ | |
this.name = name; | |
this.myBroker = myBroker; | |
} | |
@Override | |
public void run() | |
{ | |
System.out.println("Count task "+i.get()+" name "+name); | |
try | |
{ | |
String data = myBroker.getValue(); | |
while (myBroker.getProductionFlag() || data != null) | |
{ | |
Thread.sleep(1000); | |
if(data!=null){ | |
System.out.println("Consumer " + this.name + " processed data from broker: " + data); | |
i.getAndIncrement(); | |
} | |
data = myBroker.getValue(); | |
} | |
System.out.println("Comsumer " + this.name + " finished its job; terminating."+i.get()); | |
} | |
catch (InterruptedException ex) | |
{ | |
ex.printStackTrace(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment