Skip to content

Instantly share code, notes, and snippets.

@fred-o
Created March 22, 2013 12:30
Show Gist options
  • Save fred-o/5220904 to your computer and use it in GitHub Desktop.
Save fred-o/5220904 to your computer and use it in GitHub Desktop.
package app.ingester.event;
import java.util.Deque;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.joda.time.Instant;
import se.prb.client.imagearchive.scampi.asset.Usage;
import se.prb.client.imagearchive.scampi.asset.Variant;
import app.ingester.event.websocket.WebSocketStatusPublisher;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
public class StatusManager {
private int recentLength;
private Map<String, ThreadPoolExecutor> queues;
@Resource
WebSocketStatusPublisher publisher;
private Map<Object, Map<String, Object>> current = new ConcurrentHashMap<Object, Map<String, Object>>();
private Deque<Map<String, Object>> recent = new ConcurrentLinkedDeque<Map<String, Object>>();
Instant lastPublish = Instant.now();
Semaphore hasUpdates = new Semaphore(1);
volatile boolean updated;
@PostConstruct
public void init() {
publish();
}
public void publish() {
Map<String, Integer> qs = Maps.newHashMap();
for (Entry<String, ThreadPoolExecutor> entry : queues.entrySet()) {
qs.put(entry.getKey(), entry.getValue().getQueue().size());
}
Object model = ImmutableMap.of("queues", qs, "current", current.values(), "recent", recent);
if (updated) {
publisher.publish(model);
updated = false;
}
}
protected Map<String, Object> getJob(String type, Object imageId, long jobId) {
Map<String, Object> job = current.get(jobId);
if (job == null) {
job = new ImmutableMap.Builder<String, Object>().put("imageId", imageId.toString()).put("jobId", jobId)
.put("type", type).put("queued", new ConcurrentLinkedDeque<String>())
.put("running", new ConcurrentLinkedDeque<String>())
.put("finished", new ConcurrentLinkedDeque<String>()).put("start-time", Instant.now()).build();
current.put(jobId, job);
}
return job;
}
@SuppressWarnings("unchecked")
protected void adjustWorker(String type, Object imageId, long jobId, Usage usage, Variant variant, String queue) {
Map<String, Object> job = getJob(type, imageId, jobId);
if (job != null) {
String worker = String.format("%s%s", variant, usage);
((Deque<String>) job.get("queued")).remove(worker);
((Deque<String>) job.get("running")).remove(worker);
((Deque<String>) job.get("finished")).remove(worker);
((Deque<String>) job.get(queue)).add(worker);
updated = true;
}
}
public void renderingQueued(String type, Object imageId, long jobId, String threadName, Usage usage, Variant variant) {
adjustWorker(type, imageId, jobId, usage, variant, "queued");
}
public void renderingStarted(String type, Object imageId, long jobId, String threadName, Usage usage,
Variant variant) {
adjustWorker(type, imageId, jobId, usage, variant, "running");
}
public void renderingFinished(String type, Object imageId, long jobId, String threadName, Usage usage,
Variant variant) {
adjustWorker(type, imageId, jobId, usage, variant, "finished");
}
public void processingFinished(long jobId) {
Map<String, Object> job = current.remove(jobId);
if (job != null) {
recent.addFirst(new ImmutableMap.Builder<String, Object>().putAll(job).put("finished-time", Instant.now())
.build());
while (recent.size() > recentLength) {
recent.removeLast();
}
updated = true;
}
publish();
}
public void setRecentLength(int recentLength) {
this.recentLength = recentLength;
}
public void setQueues(Map<String, ThreadPoolExecutor> queues) {
this.queues = queues;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment