Skip to content

Instantly share code, notes, and snippets.

@ivanursul
Created February 26, 2017 09:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ivanursul/9cb82cf4c01fb10a74456258dea9d31f to your computer and use it in GitHub Desktop.
Save ivanursul/9cb82cf4c01fb10a74456258dea9d31f to your computer and use it in GitHub Desktop.
import java.util.*;
public class QueueModel {
public static void main(String[] args) {
Map<String, Queue<Message>> queueMap = createQueueMap();
QueueMessageProducer producer = new QueueMessageProducer(queueMap);
initConsumers(queueMap);
Scanner scanner = new Scanner(System.in);
while(true) {
String msgString = scanner.nextLine();
String msgGroup = scanner.nextLine();
Message message = new Message(
msgString,
msgGroup
);
producer.notify(message);
}
}
private static Map<String, Queue<Message>> createQueueMap() {
Map<String, Queue<Message>> queueMap = new HashMap<>();
for (int i = 1; i <= 2; i++) {
queueMap.put(
"group" + i,
new LinkedList<>()
);
}
return queueMap;
}
private static void initConsumers(Map<String, Queue<Message>> queueMap) {
List<QueueConsumer> consumers = new ArrayList<>();
for (int i = 0; i < 10; i++ ) {
String consumerGroup = new Random().nextBoolean() ? "group1" : "group2";
Queue<Message> queue = queueMap.get(consumerGroup);
QueueConsumer consumer = new QueueConsumer(
"Consumer" + i,
consumerGroup,
queue
);
consumer.start();
consumers.add(consumer);
}
}
static class QueueMessageProducer {
private Map<String, Queue<Message>> queueMap;
public QueueMessageProducer(Map<String, Queue<Message>> queue) {
this.queueMap = queue;
}
public void notify(Message message) {
synchronized (queueMap) {
Queue<Message> queue = queueMap.get(message.getConsumerGroup());
synchronized (queue) {
queue.add(message);
queue.notify();
}
}
}
}
static class QueueConsumer extends Thread {
private Queue<Message> queue;
private String group;
public QueueConsumer(String name, String group, Queue<Message> queue) {
super(name);
this.group = group;
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
synchronized (queue) {
queue.wait();
}
synchronized (queue) {
if (!queue.isEmpty()) {
Message message = queue.poll();
System.out.printf(
"%s: %s: Consuming message: %s%n", group, getName(), message
);
}
}
}
} catch (Exception e) {
System.out.printf(
"Exception occured: %s%n", e.toString()
);
}
}
}
static class Message {
private final String message;
private final String consumerGroup;
public Message(String message, String consumerGroup) {
this.message = message;
this.consumerGroup = consumerGroup;
}
public String getMessage() {
return message;
}
public String getConsumerGroup() {
return consumerGroup;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Message message1 = (Message) o;
return Objects.equals(message, message1.message) &&
Objects.equals(consumerGroup, message1.consumerGroup);
}
@Override
public int hashCode() {
return Objects.hash(message, consumerGroup);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Message{");
sb.append("message='").append(message).append('\'');
sb.append(", consumerGroup='").append(consumerGroup).append('\'');
sb.append('}');
return sb.toString();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment