Skip to content

Instantly share code, notes, and snippets.

@mairbek
Created May 13, 2011 21:57
Show Gist options
  • Save mairbek/971387 to your computer and use it in GitHub Desktop.
Save mairbek/971387 to your computer and use it in GitHub Desktop.
ZookeeperCoordinator
public class ZookeeperCoordinator implements Coordinator {
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperCoordinator.class);
private final Zoo zoo;
private final String root;
public ZookeeperCoordinator(Zoo zoo, String root) {
this.zoo = zoo;
this.root = root;
}
@Override
public void registerKernel(final Kernel kernel) {
final KernelNode kernelNode = KernelNode.create(zoo, root, kernel.getId());
kernelNode.inputWatcher(new Watcher() {
@Override
public void process(WatchedEvent event) {
final Task task = new Task(kernelNode.inputData());
final Watcher watcher = this;
Runnable runnable = new Runnable() {
@Override
public void run() {
kernel.processTask(task, new ComputationResultHandler() {
@Override
public void computationCompleted(Result result) {
kernelNode.setOutputData(serialize(result));
kernelNode.inputWatcher(watcher);
}
});
}
};
new Thread(runnable).run();
}
});
kernelNode.unlock();
}
@Override
public KernelId getAvailableKernel() {
LOG.info("Searching for available kernel");
Collection<KernelNode> nodes = KernelNode.all(zoo, root);
if (nodes.isEmpty()) {
throw new IllegalStateException("No kernels registered");
}
Iterator<KernelNode> iterator = nodes.iterator();
while (true) {
if (!iterator.hasNext()) {
LOG.info("Kernel not found");
iterator = KernelNode.all(zoo, root).iterator();
}
KernelNode next = iterator.next();
if (next.isAvailable()) {
KernelId id = new KernelId(next.getPath().split("kernel")[1]);
LOG.info("Kernel " + id + " found");
return id;
}
}
}
@Override
public void scheduleTask(final KernelId kernelId, Task task, final TaskCompletedListener taskCompletedListener) {
final KernelNode kernelNode = KernelNode.get(zoo, root, kernelId);
kernelNode.lock();
kernelNode.outputWatcher(new Watcher() {
@Override
public void process(WatchedEvent event) {
Result result = deserialize(kernelNode.outputData());
taskCompletedListener.onTaskCompleted(kernelId, result);
kernelNode.unlock();
}
});
kernelNode.setInputData(task.getString());
}
private static Result deserialize(String string) {
return new Result(Integer.valueOf(string));
}
private static String serialize(Result result) {
return "" + result.getCount();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment