Skip to content

Instantly share code, notes, and snippets.

@hogmoru
Created June 15, 2017 12:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hogmoru/a8c68747d2ca7b720db93fb2472f4c59 to your computer and use it in GitHub Desktop.
Save hogmoru/a8c68747d2ca7b720db93fb2472f4c59 to your computer and use it in GitHub Desktop.
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import static java.util.concurrent.TimeUnit.SECONDS;
public class MessageProcessor {
private static final long CLEANUP_PERIOD_S = 10;
private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>();
private final ExecutorService executorService;
public MessageProcessor(int nbThreads) {
executorService = Executors.newFixedThreadPool(nbThreads);
ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1);
cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS);
}
public void addMessageToProcess(Message message) {
ConvoQueue queue = getQueue(message.getConversationId());
queue.addMessage(message);
}
private ConvoQueue getQueue(Long convoId) {
synchronized (queuesByConvo) {
return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService));
}
}
private void removeEmptyQueues() {
synchronized (queuesByConvo) {
queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
}
}
class ConvoQueue {
private Queue<MessageTask> queue;
private MessageTask activeTask;
private ExecutorService executorService;
ConvoQueue(ExecutorService executorService) {
this.executorService = executorService;
this.queue = new LinkedBlockingQueue<>();
}
private void runNextIfPossible() {
synchronized(this) {
if (activeTask == null) {
activeTask = queue.poll();
if (activeTask != null) {
executorService.submit(activeTask);
}
}
}
}
void complete(MessageTask task) {
synchronized(this) {
if (task == activeTask) {
activeTask = null;
runNextIfPossible();
}
else {
throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task);
}
}
}
boolean isEmpty() {
return queue.isEmpty();
}
void addMessage(Message message) {
add(new MessageTask(this, message));
}
private void add(MessageTask task) {
synchronized(this) {
queue.add(task);
runNextIfPossible();
}
}
}
public class MessageTask implements Runnable {
private ConvoQueue convoQueue;
private Message message;
MessageTask(ConvoQueue convoQueue, Message message) {
this.convoQueue = convoQueue;
this.message = message;
}
@Override
public void run() {
try {
processMessage();
}
finally {
convoQueue.complete(this);
}
}
private void processMessage() {
// Dummy processing with random delay to observe reordered messages & preserved convo order
try {
Thread.sleep((long) (50*Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(message);
}
}
class Message {
private long id;
private long conversationId;
private String data;
Message(long id, long conversationId, String someData) {
this.id = id;
this.conversationId = conversationId;
this.data = someData;
}
long getConversationId() {
return conversationId;
}
String getData() {
return data;
}
public String toString() {
return "Message{" + id + "," + conversationId + "," + data + "}";
}
}
public class MessageProcessorTest {
public static void main(String[] args) {
MessageProcessor test = new MessageProcessor(2);
for (int i=1; i<100; i++) {
test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i));
}
// Kill after 4 seconds for online test
try {Thread.sleep(4000);} catch(Exception e) {}
System.exit(0);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment