Created
July 2, 2020 16:48
-
-
Save siddharthbarman/4de5399a0c5db81f23c8a031dfec0935 to your computer and use it in GitHub Desktop.
A typical multi-threaded producer-consumer sample code.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.List; | |
import java.util.ArrayList; | |
import java.util.Scanner; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
public class ProducerConsumerSample { | |
public static void main(String[] args) { | |
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); | |
ArrayList<Stopable> threads = new ArrayList<Stopable>(); | |
threads.add(new Producer(queue)); | |
System.out.println("How many consumer threads do you want? "); | |
Scanner scanner = new Scanner(System.in); | |
int consumerCount = scanner.nextInt(); | |
for(int n=0; n < consumerCount; n++) { | |
Consumer consumer = new Consumer(queue); | |
threads.add(consumer); | |
} | |
try { | |
startThreads(threads); | |
System.out.println("Press <ENTER> to stop"); | |
System.console().readLine(); | |
stopThreads(threads); | |
System.out.printf("Messages sent: %d. Messages received: %d\n", ((Producer)threads.get(0)).getTotalMessagesSent(), Consumer.getTotalMessagesReceived()); | |
scanner.close(); | |
} | |
catch(Exception e) { | |
System.out.println(e.getMessage()); | |
} | |
} | |
private static void startThreads(List<Stopable> stopables) { | |
for(Stopable stopable : stopables) { | |
stopable.start(); | |
} | |
} | |
private static void stopThreads(List<Stopable> stopables) throws InterruptedException { | |
for(Stopable stopable : stopables) { | |
stopable.signalStop(); | |
} | |
for(Stopable stopable : stopables) { | |
stopable.join(); | |
} | |
} | |
} | |
abstract class Stopable extends Thread { | |
public void signalStop() { | |
stop = true; | |
} | |
protected volatile boolean stop = false; | |
} | |
class Producer extends Stopable { | |
public Producer(BlockingQueue<String> queue) { | |
this.queue = queue; | |
} | |
@Override | |
public void run() { | |
int counter = 0; | |
try { | |
while (!stop) { | |
if (queue.isEmpty()) { | |
queue.put(strings[counter++]); | |
if (counter == strings.length) counter = 0; | |
totalMessagesSent++; | |
} | |
} | |
} | |
catch(Exception e) { | |
System.out.printf("Exception in producer: %s\n", e.getMessage()); | |
} | |
} | |
public int getTotalMessagesSent() { | |
return totalMessagesSent; | |
} | |
private int totalMessagesSent = 0; | |
private BlockingQueue<String> queue; | |
private String strings[] = new String[] { "Michal", "Rona", "Hila" }; | |
} | |
class Consumer extends Stopable { | |
public Consumer(BlockingQueue<String> queue) { | |
this.queue = queue; | |
} | |
public void run() { | |
try { | |
while (!stop) { | |
String s = queue.poll(1000, TimeUnit.MILLISECONDS); | |
if (s != null) { | |
System.out.printf("\nThreadId: %d, Recvd: %s", Thread.currentThread().getId(),s); | |
totalMessagesReceived.incrementAndGet(); | |
} | |
} | |
} | |
catch(Exception e) { | |
System.out.printf("Exception in consumer: %s\n", e.getMessage()); | |
} | |
} | |
public static int getTotalMessagesReceived() { | |
return totalMessagesReceived.get(); | |
} | |
private BlockingQueue<String> queue; | |
private static AtomicInteger totalMessagesReceived = new AtomicInteger(0); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment