Skip to content

Instantly share code, notes, and snippets.

@ivanursul
Created February 25, 2017 11:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ivanursul/ca826fce48baed04ade754dfde102859 to your computer and use it in GitHub Desktop.
Save ivanursul/ca826fce48baed04ade754dfde102859 to your computer and use it in GitHub Desktop.
import java.util.*;
public class QueueModel {
public static void main(String[] args) {
Queue<Message> queue = new LinkedList<>();
QueueMessageProducer producer = new QueueMessageProducer(queue);
List<QueueConsumer> consumers = new ArrayList<>();
for (int i = 0; i < 10; i++ ) {
QueueConsumer consumer = new QueueConsumer(
"Consumer" + i,
new Random().nextBoolean() ? "group1" : "group2",
queue
);
consumer.start();
consumers.add(consumer);
}
Scanner scanner = new Scanner(System.in);
while(true) {
String msgString = scanner.nextLine();
String msgGroup = scanner.nextLine();
Message message = new Message(
msgString,
msgGroup
);
producer.notify(message);
}
}
static class QueueMessageProducer {
private Queue<Message> queue;
public QueueMessageProducer(Queue<Message> queue) {
this.queue = queue;
}
public void notify(Message message) {
synchronized (queue) {
queue.add(message);
queue.notify();
}
}
}
static class QueueConsumer extends Thread {
private Queue<Message> queue;
private String group;
public QueueConsumer(String name, String group, Queue<Message> queue) {
super(name);
this.group = group;
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
synchronized (queue) {
queue.wait();
}
synchronized (queue) {
if (!queue.isEmpty()) {
Message message = queue.poll();
if (group.equals(message.getConsumerGroup())) {
System.out.printf(
"%s: %s: Consuming message: %s%n", group, getName(), message
);
} else {
System.out.printf(
"%s: %s: Skipping message, wrong group, message: %s%n", group, getName(), message
);
queue.add(message);
queue.notify();
}
}
}
}
} catch (Exception e) {
System.out.printf(
"Exception occured: %s%n", e.toString()
);
}
}
}
static class Message {
private final String message;
private final String consumerGroup;
public Message(String message, String consumerGroup) {
this.message = message;
this.consumerGroup = consumerGroup;
}
public String getMessage() {
return message;
}
public String getConsumerGroup() {
return consumerGroup;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Message message1 = (Message) o;
return Objects.equals(message, message1.message) &&
Objects.equals(consumerGroup, message1.consumerGroup);
}
@Override
public int hashCode() {
return Objects.hash(message, consumerGroup);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Message{");
sb.append("message='").append(message).append('\'');
sb.append(", consumerGroup='").append(consumerGroup).append('\'');
sb.append('}');
return sb.toString();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment