Skip to content

Instantly share code, notes, and snippets.

@agaoglu
Created Jun 1, 2011
Embed
What would you like to do?
Resource synchronizer

An example process synchronizer using ZooKeeper.

Coordinates timesharing of the resources among the same type of processes. Basic usage is like that:

ResourceSynchronizer rs = new ResourceSynchronizer(
    new ZooKeeper(zkconnectionstring, timeout, null), 
    "/pool", new String[]{"res0", "res1"});

prepareData();
String myResource = rs.request();
processOn(myResource);
rs.close();

More information can be found here and here

/**
* This file is licensed to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
/**
* A general resource synchronizer to use among processes. A typical use case is
* distributing the limited number of GPUs on a computer to the processes
* requiring them. Processes working on the same computer should discover the
* resources on the hardware using the same way thus having the same list of
* resource names. Processes working on the same computer should also supply the
* same pool in order to have correct synchronization. That pool often consists
* of the hostname following a general application name.
* <p>
* For an example application like myapp a viable {@code pool} is
* <b>/myapp/tt21</b>
* <p>
* for processes working on computer 'tt21'. If there are 3 GPUs on 'tt21',
* using basic /dev discovery will yield GPU names ({@code resources}) as
* <b>["nvidia0", "nvidia1", "nvidia2"]</b>. When, say, 4 processes on tt21
* requested resources with these parameters, first 3 will get the available
* ones and the fourth process will wait until one of them is freed.
* <p>
* Processes may and should use {@link #release()} after they're done with the
* given resource.
*
* @author erdem
*/
public class ResourceSynchronizer implements
Watcher,
StatCallback,
StringCallback,
VoidCallback,
ChildrenCallback {
private static final int REUSE = 1;
private int round = 1;
private final static Log log = LogFactory.getLog(ResourceSynchronizer.class);
private final ZooKeeper zk;
private final String[] resources;
private String resourceToUse;
private boolean resourceReleased = false;
private int resourceIndexToTry = 0;
private String resourceZNode;
private boolean resourceZNodeCreated;
/**
* Create a {@link ResourceSynchronizer}
*
* @param configuration
* Object will read ZK connection params from this configuration
* object
* @param pool
* An absolute path like common identifier for given resources
* @param resources
* A list of resource to be shared among processes
*/
public ResourceSynchronizer(ZooKeeper zk, String pool, String[] resources) {
this.zk = zk;
this.zk.register(this);
if (!pool.startsWith("/")) {
throw new IllegalArgumentException("Pools should start with a '/'");
}
this.resources = new String[resources.length];
for (int i = 0; i < resources.length; i++) {
this.resources[i] = pool + "/" + resources[i];
if (resources[i].contains("/")) {
throw new IllegalArgumentException("Resources should not contain '/'");
}
}
}
/**
* Request a free resource from the list of resources given while
* constructing. Method blocks until a resource has been freed.
*
* @return a free resource
* @throws IOException
*/
public String request() throws IOException {
connect();
for (String resource : resources) {
ensureResourceZNodeExists(resource);
}
locateResource();
return resourceToUse.substring(resourceToUse.lastIndexOf("/") + 1);
}
private void connect() throws IOException {
synchronized (this) {
while (!zk.getState().equals(States.CONNECTED)) {
try {
wait();
} catch (InterruptedException e) {
}
}
}
log.info("ZK ensemble connected");
}
private void ensureResourceZNodeExists(String resource) {
resourceZNode = resource;
resourceZNodeCreated = false;
zk.exists(resource, false, this, null);
synchronized (this) {
while (!resourceZNodeCreated) {
try {
wait();
} catch (InterruptedException e) {
}
}
}
if (log.isDebugEnabled()) {
log.debug("Resource ZNode '" + resource + "' exists");
}
}
private void locateResource() {
findResourceToUse();
synchronized (this) {
while (resourceToUse == null) {
try {
wait();
} catch (InterruptedException e) {
}
}
}
log.info("Resource '" + resourceToUse + "' allocated");
}
private void findResourceToUse() {
if (resourceIndexToTry == resources.length) {
if (round < REUSE) {
round++;
resourceIndexToTry = 0;
} else {
log.info("No available resource, waiting...");
for (String resource : resources) {
try {
zk.getChildren(resource, true);
} catch (Exception e) {
}
}
return;
}
}
if (log.isDebugEnabled()) {
log.debug("Asking status for " + resources[resourceIndexToTry]);
}
zk.getChildren(resources[resourceIndexToTry], false, this, null);
}
/**
* Release the resources used by this object. Should be called after the
* requested resource has been used or resource may not get freed.
*/
public void release() {
if (zk != null) {
releaseResource();
synchronized (this) {
while (!resourceReleased) {
try {
wait();
} catch (InterruptedException e) {
}
}
}
log.info("Resource released");
}
}
/**
* May be called to release resources and close the ZK connection.
*/
public void close() {
release();
if (zk != null) {
try {
zk.close();
} catch (InterruptedException e) {
}
log.info("ZK connection closed");
}
}
private void releaseResource() {
zk.delete(resourceToUse + "/" + round, -1, this, null);
}
/**
* General watcher
*/
public void process(WatchedEvent event) {
if (event.getType() == EventType.None
&& event.getState() == KeeperState.SyncConnected) {
synchronized (this) {
notify();
}
}
if (event.getType() == EventType.NodeChildrenChanged) {
if (resourceToUse == null) {
log.info("Retrying to get another resource");
round = 1;
resourceIndexToTry = 0;
findResourceToUse();
}
}
}
/**
* Called on zk.exists
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
switch (Code.get(rc)) {
case OK:
synchronized (this) {
resourceZNodeCreated = true;
notify();
}
break;
case NONODE:
createResourceZNode(path);
return;
default:
break;
}
}
/**
* Called on zk.create
*/
@Override
public void processResult(int rc, String path, Object ctx, String name) {
Code returnCode = Code.get(rc);
// root znode created
if (resourceZNode.equals(path)) {
if (returnCode == Code.OK || returnCode == Code.NODEEXISTS) {
synchronized (this) {
resourceZNodeCreated = true;
notify();
}
return;
}
}
// Recursively create parent znodes
if (resourceZNode.startsWith(path)) {
if (returnCode == Code.OK) {
createResourceZNode(path);
} else if (returnCode == Code.NONODE) {
zk.create(path.substring(0, path.lastIndexOf("/")), new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, this, null);
}
return;
}
// status znodes
if (returnCode == Code.OK) {
if (log.isDebugEnabled()) {
log.debug("Locked " + path);
}
synchronized (this) {
resourceToUse = path.substring(0, path.lastIndexOf("/"));
notify();
}
} else if (returnCode == Code.NODEEXISTS) {
if (log.isDebugEnabled()) {
log.debug("Could not lock " + path);
}
resourceIndexToTry++;
findResourceToUse();
}
}
private void createResourceZNode(String resource) {
zk.create(resource, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT, this, null);
}
/**
* Called on zk.delete
*/
@Override
public void processResult(int rc, String path, Object ctx) {
if (Code.get(rc) == Code.OK) {
synchronized (this) {
resourceReleased = true;
notify();
}
}
}
/**
* Called on zk.getChildren
*/
@Override
public void processResult(
int rc,
String path,
Object ctx,
List<String> children) {
if (log.isDebugEnabled()) {
log.debug("Received status of " + path + " : has " + children.size()
+ " children");
}
if (children.size() < REUSE) {
if (log.isDebugEnabled()) {
log.debug("Trying to lock " + resources[resourceIndexToTry] + "/" + round);
}
zk.create(resources[resourceIndexToTry] + "/" + round, new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, this, null);
} else {
resourceIndexToTry++;
findResourceToUse();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment