Skip to content

Instantly share code, notes, and snippets.

@siddharthbarman
Created July 2, 2020 16:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save siddharthbarman/4de5399a0c5db81f23c8a031dfec0935 to your computer and use it in GitHub Desktop.
Save siddharthbarman/4de5399a0c5db81f23c8a031dfec0935 to your computer and use it in GitHub Desktop.
A typical multi-threaded producer-consumer sample code.
import java.util.List;
import java.util.ArrayList;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ProducerConsumerSample {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
ArrayList<Stopable> threads = new ArrayList<Stopable>();
threads.add(new Producer(queue));
System.out.println("How many consumer threads do you want? ");
Scanner scanner = new Scanner(System.in);
int consumerCount = scanner.nextInt();
for(int n=0; n < consumerCount; n++) {
Consumer consumer = new Consumer(queue);
threads.add(consumer);
}
try {
startThreads(threads);
System.out.println("Press <ENTER> to stop");
System.console().readLine();
stopThreads(threads);
System.out.printf("Messages sent: %d. Messages received: %d\n", ((Producer)threads.get(0)).getTotalMessagesSent(), Consumer.getTotalMessagesReceived());
scanner.close();
}
catch(Exception e) {
System.out.println(e.getMessage());
}
}
private static void startThreads(List<Stopable> stopables) {
for(Stopable stopable : stopables) {
stopable.start();
}
}
private static void stopThreads(List<Stopable> stopables) throws InterruptedException {
for(Stopable stopable : stopables) {
stopable.signalStop();
}
for(Stopable stopable : stopables) {
stopable.join();
}
}
}
abstract class Stopable extends Thread {
public void signalStop() {
stop = true;
}
protected volatile boolean stop = false;
}
class Producer extends Stopable {
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
int counter = 0;
try {
while (!stop) {
if (queue.isEmpty()) {
queue.put(strings[counter++]);
if (counter == strings.length) counter = 0;
totalMessagesSent++;
}
}
}
catch(Exception e) {
System.out.printf("Exception in producer: %s\n", e.getMessage());
}
}
public int getTotalMessagesSent() {
return totalMessagesSent;
}
private int totalMessagesSent = 0;
private BlockingQueue<String> queue;
private String strings[] = new String[] { "Michal", "Rona", "Hila" };
}
class Consumer extends Stopable {
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
try {
while (!stop) {
String s = queue.poll(1000, TimeUnit.MILLISECONDS);
if (s != null) {
System.out.printf("\nThreadId: %d, Recvd: %s", Thread.currentThread().getId(),s);
totalMessagesReceived.incrementAndGet();
}
}
}
catch(Exception e) {
System.out.printf("Exception in consumer: %s\n", e.getMessage());
}
}
public static int getTotalMessagesReceived() {
return totalMessagesReceived.get();
}
private BlockingQueue<String> queue;
private static AtomicInteger totalMessagesReceived = new AtomicInteger(0);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment