|
/** |
|
* This file is licensed to you under the Apache License, Version 2.0 (the |
|
* "License"); you may not use this file except in compliance with the |
|
* License. You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
*/ |
|
|
|
import java.io.IOException; |
|
import java.util.List; |
|
|
|
import org.apache.commons.logging.Log; |
|
import org.apache.commons.logging.LogFactory; |
|
import org.apache.zookeeper.AsyncCallback.ChildrenCallback; |
|
import org.apache.zookeeper.AsyncCallback.StatCallback; |
|
import org.apache.zookeeper.AsyncCallback.StringCallback; |
|
import org.apache.zookeeper.AsyncCallback.VoidCallback; |
|
import org.apache.zookeeper.CreateMode; |
|
import org.apache.zookeeper.KeeperException.Code; |
|
import org.apache.zookeeper.WatchedEvent; |
|
import org.apache.zookeeper.Watcher; |
|
import org.apache.zookeeper.Watcher.Event.EventType; |
|
import org.apache.zookeeper.Watcher.Event.KeeperState; |
|
import org.apache.zookeeper.ZooDefs.Ids; |
|
import org.apache.zookeeper.ZooKeeper; |
|
import org.apache.zookeeper.ZooKeeper.States; |
|
import org.apache.zookeeper.data.Stat; |
|
|
|
/** |
|
* A general resource synchronizer to use among processes. A typical use case is |
|
* distributing the limited number of GPUs on a computer to the processes |
|
* requiring them. Processes working on the same computer should discover the |
|
* resources on the hardware using the same way thus having the same list of |
|
* resource names. Processes working on the same computer should also supply the |
|
* same pool in order to have correct synchronization. That pool often consists |
|
* of the hostname following a general application name. |
|
* <p> |
|
* For an example application like myapp a viable {@code pool} is |
|
* <b>/myapp/tt21</b> |
|
* <p> |
|
* for processes working on computer 'tt21'. If there are 3 GPUs on 'tt21', |
|
* using basic /dev discovery will yield GPU names ({@code resources}) as |
|
* <b>["nvidia0", "nvidia1", "nvidia2"]</b>. When, say, 4 processes on tt21 |
|
* requested resources with these parameters, first 3 will get the available |
|
* ones and the fourth process will wait until one of them is freed. |
|
* <p> |
|
* Processes may and should use {@link #release()} after they're done with the |
|
* given resource. |
|
* |
|
* @author erdem |
|
*/ |
|
public class ResourceSynchronizer implements |
|
Watcher, |
|
StatCallback, |
|
StringCallback, |
|
VoidCallback, |
|
ChildrenCallback { |
|
|
|
private static final int REUSE = 1; |
|
private int round = 1; |
|
|
|
private final static Log log = LogFactory.getLog(ResourceSynchronizer.class); |
|
|
|
private final ZooKeeper zk; |
|
private final String[] resources; |
|
|
|
private String resourceToUse; |
|
private boolean resourceReleased = false; |
|
|
|
private int resourceIndexToTry = 0; |
|
|
|
private String resourceZNode; |
|
private boolean resourceZNodeCreated; |
|
|
|
/** |
|
* Create a {@link ResourceSynchronizer} |
|
* |
|
* @param configuration |
|
* Object will read ZK connection params from this configuration |
|
* object |
|
* @param pool |
|
* An absolute path like common identifier for given resources |
|
* @param resources |
|
* A list of resource to be shared among processes |
|
*/ |
|
public ResourceSynchronizer(ZooKeeper zk, String pool, String[] resources) { |
|
this.zk = zk; |
|
this.zk.register(this); |
|
|
|
if (!pool.startsWith("/")) { |
|
throw new IllegalArgumentException("Pools should start with a '/'"); |
|
} |
|
|
|
this.resources = new String[resources.length]; |
|
for (int i = 0; i < resources.length; i++) { |
|
this.resources[i] = pool + "/" + resources[i]; |
|
if (resources[i].contains("/")) { |
|
throw new IllegalArgumentException("Resources should not contain '/'"); |
|
} |
|
} |
|
} |
|
|
|
/** |
|
* Request a free resource from the list of resources given while |
|
* constructing. Method blocks until a resource has been freed. |
|
* |
|
* @return a free resource |
|
* @throws IOException |
|
*/ |
|
public String request() throws IOException { |
|
connect(); |
|
for (String resource : resources) { |
|
ensureResourceZNodeExists(resource); |
|
} |
|
locateResource(); |
|
|
|
return resourceToUse.substring(resourceToUse.lastIndexOf("/") + 1); |
|
} |
|
|
|
private void connect() throws IOException { |
|
synchronized (this) { |
|
while (!zk.getState().equals(States.CONNECTED)) { |
|
try { |
|
wait(); |
|
} catch (InterruptedException e) { |
|
} |
|
} |
|
} |
|
log.info("ZK ensemble connected"); |
|
} |
|
|
|
private void ensureResourceZNodeExists(String resource) { |
|
resourceZNode = resource; |
|
resourceZNodeCreated = false; |
|
zk.exists(resource, false, this, null); |
|
synchronized (this) { |
|
while (!resourceZNodeCreated) { |
|
try { |
|
wait(); |
|
} catch (InterruptedException e) { |
|
} |
|
} |
|
} |
|
if (log.isDebugEnabled()) { |
|
log.debug("Resource ZNode '" + resource + "' exists"); |
|
} |
|
} |
|
|
|
private void locateResource() { |
|
findResourceToUse(); |
|
synchronized (this) { |
|
while (resourceToUse == null) { |
|
try { |
|
wait(); |
|
} catch (InterruptedException e) { |
|
} |
|
} |
|
} |
|
log.info("Resource '" + resourceToUse + "' allocated"); |
|
} |
|
|
|
private void findResourceToUse() { |
|
if (resourceIndexToTry == resources.length) { |
|
if (round < REUSE) { |
|
round++; |
|
resourceIndexToTry = 0; |
|
} else { |
|
log.info("No available resource, waiting..."); |
|
for (String resource : resources) { |
|
try { |
|
zk.getChildren(resource, true); |
|
} catch (Exception e) { |
|
} |
|
} |
|
return; |
|
} |
|
} |
|
if (log.isDebugEnabled()) { |
|
log.debug("Asking status for " + resources[resourceIndexToTry]); |
|
} |
|
zk.getChildren(resources[resourceIndexToTry], false, this, null); |
|
} |
|
|
|
/** |
|
* Release the resources used by this object. Should be called after the |
|
* requested resource has been used or resource may not get freed. |
|
*/ |
|
public void release() { |
|
if (zk != null) { |
|
releaseResource(); |
|
synchronized (this) { |
|
while (!resourceReleased) { |
|
try { |
|
wait(); |
|
} catch (InterruptedException e) { |
|
} |
|
} |
|
} |
|
log.info("Resource released"); |
|
} |
|
} |
|
|
|
/** |
|
* May be called to release resources and close the ZK connection. |
|
*/ |
|
public void close() { |
|
release(); |
|
if (zk != null) { |
|
try { |
|
zk.close(); |
|
} catch (InterruptedException e) { |
|
} |
|
log.info("ZK connection closed"); |
|
} |
|
} |
|
|
|
private void releaseResource() { |
|
zk.delete(resourceToUse + "/" + round, -1, this, null); |
|
} |
|
|
|
/** |
|
* General watcher |
|
*/ |
|
public void process(WatchedEvent event) { |
|
if (event.getType() == EventType.None |
|
&& event.getState() == KeeperState.SyncConnected) { |
|
synchronized (this) { |
|
notify(); |
|
} |
|
} |
|
if (event.getType() == EventType.NodeChildrenChanged) { |
|
if (resourceToUse == null) { |
|
log.info("Retrying to get another resource"); |
|
round = 1; |
|
resourceIndexToTry = 0; |
|
findResourceToUse(); |
|
} |
|
} |
|
} |
|
|
|
/** |
|
* Called on zk.exists |
|
*/ |
|
@Override |
|
public void processResult(int rc, String path, Object ctx, Stat stat) { |
|
switch (Code.get(rc)) { |
|
case OK: |
|
synchronized (this) { |
|
resourceZNodeCreated = true; |
|
notify(); |
|
} |
|
break; |
|
case NONODE: |
|
createResourceZNode(path); |
|
return; |
|
default: |
|
break; |
|
} |
|
} |
|
|
|
/** |
|
* Called on zk.create |
|
*/ |
|
@Override |
|
public void processResult(int rc, String path, Object ctx, String name) { |
|
Code returnCode = Code.get(rc); |
|
// root znode created |
|
if (resourceZNode.equals(path)) { |
|
if (returnCode == Code.OK || returnCode == Code.NODEEXISTS) { |
|
synchronized (this) { |
|
resourceZNodeCreated = true; |
|
notify(); |
|
} |
|
return; |
|
} |
|
} |
|
|
|
// Recursively create parent znodes |
|
if (resourceZNode.startsWith(path)) { |
|
if (returnCode == Code.OK) { |
|
createResourceZNode(path); |
|
} else if (returnCode == Code.NONODE) { |
|
zk.create(path.substring(0, path.lastIndexOf("/")), new byte[0], |
|
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, this, null); |
|
} |
|
return; |
|
} |
|
|
|
// status znodes |
|
if (returnCode == Code.OK) { |
|
if (log.isDebugEnabled()) { |
|
log.debug("Locked " + path); |
|
} |
|
synchronized (this) { |
|
resourceToUse = path.substring(0, path.lastIndexOf("/")); |
|
notify(); |
|
} |
|
} else if (returnCode == Code.NODEEXISTS) { |
|
if (log.isDebugEnabled()) { |
|
log.debug("Could not lock " + path); |
|
} |
|
resourceIndexToTry++; |
|
findResourceToUse(); |
|
} |
|
} |
|
|
|
private void createResourceZNode(String resource) { |
|
zk.create(resource, new byte[0], Ids.OPEN_ACL_UNSAFE, |
|
CreateMode.PERSISTENT, this, null); |
|
} |
|
|
|
/** |
|
* Called on zk.delete |
|
*/ |
|
@Override |
|
public void processResult(int rc, String path, Object ctx) { |
|
if (Code.get(rc) == Code.OK) { |
|
synchronized (this) { |
|
resourceReleased = true; |
|
notify(); |
|
} |
|
} |
|
} |
|
|
|
/** |
|
* Called on zk.getChildren |
|
*/ |
|
@Override |
|
public void processResult( |
|
int rc, |
|
String path, |
|
Object ctx, |
|
List<String> children) { |
|
if (log.isDebugEnabled()) { |
|
log.debug("Received status of " + path + " : has " + children.size() |
|
+ " children"); |
|
} |
|
if (children.size() < REUSE) { |
|
if (log.isDebugEnabled()) { |
|
log.debug("Trying to lock " + resources[resourceIndexToTry] + "/" + round); |
|
} |
|
zk.create(resources[resourceIndexToTry] + "/" + round, new byte[0], |
|
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, this, null); |
|
} else { |
|
resourceIndexToTry++; |
|
findResourceToUse(); |
|
} |
|
} |
|
} |