Created
May 13, 2011 21:57
-
-
Save mairbek/971387 to your computer and use it in GitHub Desktop.
ZookeeperCoordinator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ZookeeperCoordinator implements Coordinator { | |
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperCoordinator.class); | |
private final Zoo zoo; | |
private final String root; | |
public ZookeeperCoordinator(Zoo zoo, String root) { | |
this.zoo = zoo; | |
this.root = root; | |
} | |
@Override | |
public void registerKernel(final Kernel kernel) { | |
final KernelNode kernelNode = KernelNode.create(zoo, root, kernel.getId()); | |
kernelNode.inputWatcher(new Watcher() { | |
@Override | |
public void process(WatchedEvent event) { | |
final Task task = new Task(kernelNode.inputData()); | |
final Watcher watcher = this; | |
Runnable runnable = new Runnable() { | |
@Override | |
public void run() { | |
kernel.processTask(task, new ComputationResultHandler() { | |
@Override | |
public void computationCompleted(Result result) { | |
kernelNode.setOutputData(serialize(result)); | |
kernelNode.inputWatcher(watcher); | |
} | |
}); | |
} | |
}; | |
new Thread(runnable).run(); | |
} | |
}); | |
kernelNode.unlock(); | |
} | |
@Override | |
public KernelId getAvailableKernel() { | |
LOG.info("Searching for available kernel"); | |
Collection<KernelNode> nodes = KernelNode.all(zoo, root); | |
if (nodes.isEmpty()) { | |
throw new IllegalStateException("No kernels registered"); | |
} | |
Iterator<KernelNode> iterator = nodes.iterator(); | |
while (true) { | |
if (!iterator.hasNext()) { | |
LOG.info("Kernel not found"); | |
iterator = KernelNode.all(zoo, root).iterator(); | |
} | |
KernelNode next = iterator.next(); | |
if (next.isAvailable()) { | |
KernelId id = new KernelId(next.getPath().split("kernel")[1]); | |
LOG.info("Kernel " + id + " found"); | |
return id; | |
} | |
} | |
} | |
@Override | |
public void scheduleTask(final KernelId kernelId, Task task, final TaskCompletedListener taskCompletedListener) { | |
final KernelNode kernelNode = KernelNode.get(zoo, root, kernelId); | |
kernelNode.lock(); | |
kernelNode.outputWatcher(new Watcher() { | |
@Override | |
public void process(WatchedEvent event) { | |
Result result = deserialize(kernelNode.outputData()); | |
taskCompletedListener.onTaskCompleted(kernelId, result); | |
kernelNode.unlock(); | |
} | |
}); | |
kernelNode.setInputData(task.getString()); | |
} | |
private static Result deserialize(String string) { | |
return new Result(Integer.valueOf(string)); | |
} | |
private static String serialize(Result result) { | |
return "" + result.getCount(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment