Skip to content

Instantly share code, notes, and snippets.

@vbezhenar
Created December 4, 2014 12:17
Show Gist options
  • Save vbezhenar/14a3009843a3f8e8fad2 to your computer and use it in GitHub Desktop.
Save vbezhenar/14a3009843a3f8e8fad2 to your computer and use it in GitHub Desktop.
package com.f5group.eauckz.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.*;
public class GroupingScheduler<TId> {
private static final Logger logger = LoggerFactory.getLogger(GroupingScheduler.class);
public interface Cancellable {
void cancel();
}
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ExecutorService executor = Executors.newCachedThreadPool();
private final ConcurrentMapQueue<TId, CancellableTask> tasks = new ConcurrentMapQueue<>();
public Cancellable schedule(TId id, Runnable command, long delay, TimeUnit unit) {
CancellableTask task = new CancellableTask(id, command);
SchedulerRunnable schedulerRunnable = new SchedulerRunnable(task);
scheduler.schedule(schedulerRunnable, delay, unit);
return task;
}
public void shutdown() {
scheduler.shutdown();
executor.shutdown();
}
private class SchedulerRunnable implements Runnable {
private final CancellableTask task;
private SchedulerRunnable(CancellableTask task) {
this.task = task;
}
@Override
public void run() {
tasks.putAndIfWasEmpty(task.id, task, () -> GroupingScheduler.this.executor.execute(task));
}
}
private class CancellableTask implements Cancellable, Runnable {
private final TId id;
private final Runnable command;
private volatile boolean cancelled = false;
private CancellableTask(TId id, Runnable command) {
this.id = id;
this.command = command;
}
@Override
public void cancel() {
cancelled = true;
}
@Override
public void run() {
if (!cancelled) {
try {
command.run();
} catch (Exception e) {
GroupingScheduler.logger.warn(e.getMessage(), e);
}
}
Optional<CancellableTask> nextTask = GroupingScheduler.this.tasks.removeAndPeek(id);
if (nextTask.isPresent()) {
GroupingScheduler.this.executor.execute(nextTask.get());
}
}
}
private static class ConcurrentMapQueue<K, V> {
private final Map<K, Queue<V>> map = new HashMap<>();
public Optional<V> removeAndPeek(K key) {
synchronized (map) {
Queue<V> values = map.get(key);
if (values == null) {
throw new IllegalStateException("No values for key " + key);
}
values.remove();
if (values.isEmpty()) {
map.remove(key);
return Optional.empty();
}
V value = values.element();
return Optional.of(value);
}
}
public void putAndIfWasEmpty(K key, V value, Runnable commandIfEmpty) {
synchronized (map) {
boolean wasEmpty;
Queue<V> values = map.get(key);
if (values == null) {
values = new ArrayDeque<>();
map.put(key, values);
wasEmpty = true;
} else {
wasEmpty = false;
}
values.add(value);
if (wasEmpty) {
commandIfEmpty.run();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment