Skip to content

Instantly share code, notes, and snippets.

@PrabhatKJena
Created January 25, 2018 03:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PrabhatKJena/cf5985a475488a95a76987da769e598b to your computer and use it in GitHub Desktop.
Save PrabhatKJena/cf5985a475488a95a76987da769e598b to your computer and use it in GitHub Desktop.
Publisher-Subscriber Thread Implementation
package edu.pk.java.test;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import static edu.pk.java.test.PublisherSubscriberTest.*;
public class PublisherSubscriberTest {
private static final int QUEUE_MAX_SIZE = 100;
public static final long THREAD_WAIT_TIME_IN_MS = 20;
public static void main(String[] args) {
BlockingQueue<Integer> buffer = new ArrayBlockingQueue<>(QUEUE_MAX_SIZE);
Writer writer = new Writer(buffer);
Reader reader1 = new Reader(buffer);
Reader reader2 = new Reader(buffer);
Reader reader3 = new Reader(buffer);
new Thread(writer).start();
new Thread(reader1).start();
new Thread(reader2).start();
new Thread(reader3).start();
try {
Thread.sleep(200L); // This is for testing purpose. Wait and Send shutdown signal after this.
} catch (InterruptedException e) {
}
Writer.setShutDown(true);
Reader.setShutDown(true, false);
}
}
class Writer implements Runnable {
private BlockingQueue<Integer> queue;
private static volatile boolean isShutDown;
public Writer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
Random random = new Random();
while (!isShutDown) {
int nextInt = random.nextInt(Integer.MAX_VALUE);
try {
// wait for 200ms, then retry
while (!queue.offer(nextInt, THREAD_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS)) {
System.out.println("Retrying........");
}
System.out.printf("\n%s:%s -> %d", "Writing by", Thread.currentThread().getName(), nextInt);
} catch (InterruptedException e) {
}
}
}
public static void setShutDown(boolean shutDown) {
isShutDown = shutDown;
}
}
class Reader implements Runnable {
private BlockingQueue<Integer> queue;
private static volatile boolean isShutDown; // this will trigger shutdown signal but it will read the remaining items from the buffer if isForced = FALSE
private static volatile boolean isForced; // this is to stop immediately
public Reader(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
// check for shutdown signal
boolean shutdown = false;
if (isShutDown && isForced)
shutdown = true;
else if (isShutDown && !isForced && queue.size() < 1) // if isForced = FALSE, read remaining items from buffer
shutdown = true;
if (shutdown) {
break; // Stop Reading
}
try {
// Wait for 200ms if no item to retrieve from buffer
Integer poll = queue.poll(THREAD_WAIT_TIME_IN_MS, TimeUnit.MILLISECONDS);
if (poll != null)
System.out.printf("\n%30s:%s -> %d", "Reading by", Thread.currentThread().getName(), poll);
} catch (InterruptedException e) {
}
}
System.out.println("Remaining items:" + queue);
}
public static void setShutDown(boolean shutDown, boolean forced) {
isShutDown = shutDown;
isForced = forced;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment