Skip to content

Instantly share code, notes, and snippets.

@vbezhenar
Last active August 29, 2015 14:10
Show Gist options
  • Save vbezhenar/cc49928995c03c1289bb to your computer and use it in GitHub Desktop.
Save vbezhenar/cc49928995c03c1289bb to your computer and use it in GitHub Desktop.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* This class provides interface similar to ScheduledExecutorService. An important difference is
* that each submitted command has an identifier. GroupingScheduler guarantees that commands with
* the same identifier will be executed sequentially in correct order. Commands with different identifiers
* will be executed in parallel.
*
* schedule/submit methods returns Cancellable so user is able to cancel submitted task. Task can be cancelled only
* before it started to execute. Cancelling task which is executing is not supported.
*
* @param <TId> type of the identifier. Must provide correct equals/hashCode methods.
*/
public class GroupingScheduler<TId> {
private static final Logger logger = LoggerFactory.getLogger(GroupingScheduler.class);
public interface Cancellable {
void cancel();
}
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ExecutorService executor = Executors.newCachedThreadPool();
private final ConcurrentMapQueue<TId, CancellableTask> tasks = new ConcurrentMapQueue<>();
public Cancellable schedule(TId id, Runnable command, long delay, TimeUnit unit) {
CancellableTask task = new CancellableTask(id, command);
SchedulerRunnable schedulerRunnable = new SchedulerRunnable(task);
scheduler.schedule(schedulerRunnable, delay, unit);
return task;
}
public Cancellable submit(TId id, Runnable command) {
CancellableTask task = new CancellableTask(id, command);
tasks.putAndIfWasEmpty(id, task, () -> GroupingScheduler.this.executor.execute(task));
return task;
}
public void shutdown() {
scheduler.shutdown();
executor.shutdown();
}
private class SchedulerRunnable implements Runnable {
private final CancellableTask task;
private SchedulerRunnable(CancellableTask task) {
this.task = task;
}
@Override
public void run() {
tasks.putAndIfWasEmpty(task.id, task, () -> GroupingScheduler.this.executor.execute(task));
}
}
private class CancellableTask implements Cancellable, Runnable {
private final TId id;
private final Runnable command;
private volatile boolean cancelled = false;
private CancellableTask(TId id, Runnable command) {
this.id = id;
this.command = command;
}
@Override
public void cancel() {
cancelled = true;
}
@Override
public void run() {
if (!cancelled) {
try {
command.run();
} catch (Exception e) {
GroupingScheduler.logger.warn(e.getMessage(), e);
}
}
GroupingScheduler.this.tasks.removeAndTryPeek(id, GroupingScheduler.this.executor::execute);
}
}
private static class ConcurrentMapQueue<K, V> {
private final Map<K, Queue<V>> map = new HashMap<>();
public void removeAndTryPeek(K key, Consumer<V> consumer) {
synchronized (map) {
Queue<V> values = map.get(key);
if (values == null) {
throw new IllegalStateException("No values for key " + key);
}
values.remove();
if (values.isEmpty()) {
map.remove(key);
} else {
consumer.accept(values.element());
}
}
}
public void putAndIfWasEmpty(K key, V value, Runnable commandIfEmpty) {
synchronized (map) {
boolean wasEmpty;
Queue<V> values = map.get(key);
if (values == null) {
values = new ArrayDeque<>();
map.put(key, values);
wasEmpty = true;
} else {
wasEmpty = false;
}
values.add(value);
if (wasEmpty) {
commandIfEmpty.run();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment