Skip to content

Instantly share code, notes, and snippets.

@hishidama
Created November 27, 2011 03:45
Show Gist options
  • Save hishidama/1396921 to your computer and use it in GitHub Desktop.
Save hishidama/1396921 to your computer and use it in GitHub Desktop.
Hadoop0.23 YARN 円周率算出サンプル(中途半端版)
package sample.yarn;
import java.io.IOException;
import java.net.URL;
import java.net.URLDecoder;
import java.util.Enumeration;
/**
* クラスのユーティリティー.
* <p>
* →<a href=
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn2.html"
* >説明</a>
* </p>
*
* @author hishidama
*/
public class ClassUtil {
/**
* @see org.apache.hadoop.mapred.JobConf#findContainingJar()
*/
public static String findContainingJar(Class<?> my_class) {
ClassLoader loader = my_class.getClassLoader();
String class_file = my_class.getName().replaceAll("\\.", "/")
+ ".class";
try {
for (Enumeration<URL> itr = loader.getResources(class_file); itr
.hasMoreElements();) {
URL url = itr.nextElement();
if ("jar".equals(url.getProtocol())) {
String toReturn = url.getPath();
if (toReturn.startsWith("file:")) {
toReturn = toReturn.substring("file:".length());
}
toReturn = toReturn.replaceAll("\\+", "%2B");
toReturn = URLDecoder.decode(toReturn, "UTF-8");
return toReturn.replaceAll("!.*$", "");
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
}
package sample.yarn.sample2;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import sample.yarn.YarnAppMstr;
/**
* Hadoop0.23.0 YARN 円周率算出サンプル(中途半端版)アプリケーションマスター.
* <p>
* →<a href=
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn2.html"
* >説明</a>
* </p>
*
* @author hishidama
*/
public class Sample2AppMstr extends YarnAppMstr {
public static void main(String[] args) throws Exception {
int r = run(new Sample2AppMstr(), args);
System.exit(r);
}
/** 起動するコンテナー数 */
private int nContainers;
/** 演算(サンプリング)する回数 */
private long nSamples;
@Override
protected void init(String[] args) {
if (args.length != 2) {
System.err.println("Usage: " + getClass().getName()
+ " <nContainers> <nSamples>");
throw new IllegalArgumentException("args.length=" + args.length);
}
nContainers = Integer.parseInt(args[0]);
nSamples = Long.parseLong(args[1]);
}
@Override
protected Queue<ContainerLauncher> getContainerQueue() {
Queue<ContainerLauncher> queue = new ArrayBlockingQueue<ContainerLauncher>(
nContainers);
for (int i = 0; i < nContainers; i++) {
Sample2Launcher launcher = new Sample2Launcher(i * nSamples,
nSamples);
queue.add(launcher);
}
return queue;
}
class Sample2Launcher implements ContainerLauncher {
private long offset;
private long samples;
public Sample2Launcher(long offset, long samples) {
this.offset = offset;
this.samples = samples;
}
@Override
public LaunchThread create() {
return new Sample2LaunchThread();
}
class Sample2LaunchThread extends JavaLaunchThread {
public Sample2LaunchThread() {
super(Sample2Container.class);
}
@Override
protected List<Object> getCommandArgs() {
return Arrays.<Object> asList(offset, samples);
}
}
}
}
package sample.yarn.sample2;
import java.util.Arrays;
import java.util.List;
import sample.yarn.YarnClient;
/**
* Hadoop0.23.0 YARN 円周率算出サンプル(中途半端版)クライアント.
* <p>
* →<a href=
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn2.html"
* >説明</a>
* </p>
*
* @author hishidama
*/
public class Sample2Client extends YarnClient {
public static void main(String[] args) throws Exception {
int r = run(new Sample2Client(), args);
System.exit(r);
}
/** 起動するコンテナー数 */
private int nContainers;
/** 演算(サンプリング)する回数 */
private long nSamples;
/** コンストラクター */
public Sample2Client() {
super("sample2", Sample2AppMstr.class);
}
@Override
protected void init(String[] args) {
if (args.length != 2) {
System.err.println("Usage: " + getClass().getName()
+ " <nContainers> <nSamples>");
throw new IllegalArgumentException("args.length=" + args.length);
}
nContainers = Integer.parseInt(args[0]);
nSamples = Long.parseLong(args[1]);
}
@Override
protected List<Object> getCommandArgs() {
return Arrays.<Object> asList(nContainers, nSamples);
}
}
package sample.yarn.sample2;
/**
* Hadoop0.23.0 YARN 円周率算出サンプル(中途半端版)コンテナー.
* <p>
* モンテカルロ法を用いて円周率計算の為の数値算出を行うが、結果を返さず、ログに出力するのみ。
* </p>
* <p>
* →<a href=
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn2.html"
* >説明</a>
* </p>
*
* @see org.apache.hadoop.examples.QuasiMonteCarlo
* @author hishidama
*/
public class Sample2Container {
public static void main(String[] args) {
Sample2Container container = new Sample2Container();
container.init(args);
int r = container.run();
System.exit(r);
}
private long offset;
private long samples;
public void init(String[] args) {
offset = Long.parseLong(args[0]);
samples = Long.parseLong(args[1]);
System.out.println("offset= " + offset);
System.out.println("samples=" + samples);
}
public int run() {
final HaltonSequence haltonsequence = new HaltonSequence(offset);
long numInside = 0L;
long numOutside = 0L;
for (long i = 0; i < samples;) {
// generate points in a unit square
final double[] point = haltonsequence.nextPoint();
// count points inside/outside of the inscribed circle of the square
final double x = point[0] - 0.5;
final double y = point[1] - 0.5;
if (x * x + y * y > 0.25) {
numOutside++;
} else {
numInside++;
}
// report status
i++;
if (i % 1000 == 0) {
System.err.println("Generated " + i + " samples.");
}
}
// 本来はHDFSに出力したりするんだろうけど、今は表示するだけ
System.out.println("numInside= " + numInside);
System.out.println("numOutside=" + numOutside);
return 0;
}
/**
* @see org.apache.hadoop.examples.QuasiMonteCarlo.HaltonSequence
*/
private static class HaltonSequence {
/** Bases */
static final int[] P = { 2, 3 };
/** Maximum number of digits allowed */
static final int[] K = { 63, 40 };
private long index;
private double[] x;
private double[][] q;
private int[][] d;
/**
* Initialize to H(startindex), so the sequence begins with
* H(startindex+1).
*/
HaltonSequence(long startindex) {
index = startindex;
x = new double[K.length];
q = new double[K.length][];
d = new int[K.length][];
for (int i = 0; i < K.length; i++) {
q[i] = new double[K[i]];
d[i] = new int[K[i]];
}
for (int i = 0; i < K.length; i++) {
long k = index;
x[i] = 0;
for (int j = 0; j < K[i]; j++) {
q[i][j] = (j == 0 ? 1.0 : q[i][j - 1]) / P[i];
d[i][j] = (int) (k % P[i]);
k = (k - d[i][j]) / P[i];
x[i] += d[i][j] * q[i][j];
}
}
}
/**
* Compute next point. Assume the current point is H(index). Compute
* H(index+1).
*
* @return a 2-dimensional point with coordinates in [0,1)^2
*/
double[] nextPoint() {
index++;
for (int i = 0; i < K.length; i++) {
for (int j = 0; j < K[i]; j++) {
d[i][j]++;
x[i] += q[i][j];
if (d[i][j] < P[i]) {
break;
}
d[i][j] = 0;
x[i] -= (j == 0 ? 1.0 : q[i][j - 1]);
}
}
return x;
}
}
}
package sample.yarn;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
/**
* Hadoop0.23.0 YARN ApplicationMaster.
* <p>
* →<a href=
* "http://hadoop.apache.org/common/docs/r0.23.0/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html"
* >参考</a><br>
* →<a href=
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn2.html"
* >説明</a>
* </p>
*
* @see org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster
* @author hishidama
*/
public abstract class YarnAppMstr {
private static final Log LOG = LogFactory.getLog(YarnAppMstr.class);
/**
* 処理実行.
*
* @param app
* 実行対象
* @param args
* main()の引数
* @return 終了コード
* @throws Exception
*/
protected static int run(YarnAppMstr app, String[] args) throws Exception {
return app.run(args);
}
protected Configuration conf;
protected YarnRPC rpc;
protected ApplicationAttemptId appAttemptID;
protected AMRMProtocol resourceManager;
/** コンテナーの優先度 */
private int requestPriority = 0;
/** コンテナーを実行するのに必要な(要求する)メモリーサイズ */
protected int containerMemory = 1 * 1024;
/** 起動するコンテナー数 */
private int numTotalContainers;
/** 完了したコンテナー数 */
private AtomicInteger numCompletedContainers = new AtomicInteger();
/** アロケートされたコンテナー数 */
private AtomicInteger numAllocatedContainers = new AtomicInteger();
/** 失敗したコンテナー数 */
private AtomicInteger numFailedContainers = new AtomicInteger();
/** リソースマネージャーへのRPC呼び出し毎に増加させるカウンター */
private AtomicInteger rmRequestID = new AtomicInteger();
/** リリースされるコンテナー */
private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
/** コンテナーID毎の起動クラス */
private Map<ContainerId, ContainerLauncher> launcherMap = new HashMap<ContainerId, ContainerLauncher>();
/** 起動したスレッド */
private List<Thread> launchThreads = new ArrayList<Thread>();
/** コンストラクター */
protected YarnAppMstr() {
conf = new Configuration();
rpc = YarnRPC.create(conf);
}
/**
* 処理実行.
*
* @param args
* main()の引数
* @throws YarnRemoteException
*/
public int run(String[] args) throws YarnRemoteException {
init(args);
appAttemptID = getAppAttemptID();
resourceManager = connectToRM();
RegisterApplicationMasterResponse amResponse = registerToRM();
initResource(amResponse);
loop();
finish();
return 0;
}
/**
* 初期化.
*
* @param args
* main()の引数
*/
protected void init(String[] args) {
// 各クラスでオーバーライドする
}
/**
* appAttemptIDを取得する.
*
* @return
*/
protected ApplicationAttemptId getAppAttemptID() {
Map<String, String> envs = System.getenv();
String containerIdString = envs
.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
if (containerIdString == null) {
throw new IllegalArgumentException(
"ContainerId not set in the environment");
}
ContainerId containerId = ConverterUtils
.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId
.getApplicationAttemptId();
return appAttemptID;
}
/**
* リソースマネージャーへ接続する.
*
* @return
*/
protected AMRMProtocol connectToRM() {
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
LOG.info("Connecting to ResourceManager at " + rmAddress);
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf);
}
/**
* リソースマネージャーへ自分を登録する.
*
* @return
* @throws YarnRemoteException
*/
protected RegisterApplicationMasterResponse registerToRM()
throws YarnRemoteException {
RegisterApplicationMasterRequest appMasterRequest = Records
.newRecord(RegisterApplicationMasterRequest.class);
appMasterRequest.setApplicationAttemptId(appAttemptID);
// appMasterRequest.setHost(appMasterHostname);
// appMasterRequest.setRpcPort(appMasterRpcPort);
// appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
return resourceManager.registerApplicationMaster(appMasterRequest);
}
/**
* コンテナーのリソースの値を初期化する.
*
* @param response
*/
protected void initResource(RegisterApplicationMasterResponse response) {
int minMem = response.getMinimumResourceCapability().getMemory();
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Min mem capabililty of resources in this cluster " + minMem);
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
if (containerMemory < minMem) {
LOG.info("Container memory specified below min threshold of cluster. Using min value."
+ ", specified=" + containerMemory + ", min=" + minMem);
containerMemory = minMem;
} else if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster. Using max value."
+ ", specified=" + containerMemory + ", max=" + maxMem);
containerMemory = maxMem;
}
}
/**
* コンテナー起動・監視ループ.
*
* @throws YarnRemoteException
*/
protected void loop() throws YarnRemoteException {
Queue<ContainerLauncher> queue = getContainerQueue();
numTotalContainers = queue.size();
while (numCompletedContainers.get() < numTotalContainers) {
AMResponse amResp = kickContainer(queue);
Resource availableResources = amResp.getAvailableResources();
LOG.info("Current available resources in the cluster "
+ availableResources);
checkComplete(amResp, queue);
if (numCompletedContainers.get() == numTotalContainers) {
break; // ループ終了
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.debug("Thread sleep in monitoring loop interrupted");
}
}
// スレッドの終了待ち
for (Thread launchThread : launchThreads) {
try {
launchThread.join(10 * 1000);
} catch (InterruptedException e) {
LOG.info("Exception thrown in thread join: " + e.getMessage());
e.printStackTrace();
}
}
}
/**
* コンテナー起動情報を生成する.
*
* @return
*/
protected abstract Queue<ContainerLauncher> getContainerQueue();
/**
* コンテナーを起動する.
*
* @param queue
* @return
* @throws YarnRemoteException
*/
protected AMResponse kickContainer(Queue<ContainerLauncher> queue)
throws YarnRemoteException {
int askCount = queue.size();
List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
if (askCount > 0) {
ResourceRequest containerAsk = setupContainerAskForRM(askCount);
resourceReq.add(containerAsk);
}
AMResponse amResp = sendContainerAskToRM(resourceReq);
List<Container> allocatedContainers = amResp.getAllocatedContainers();
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
numAllocatedContainers.addAndGet(allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
ContainerLauncher launcher = queue.poll();
if (launcher != null) {
LaunchThread thread = launcher.create();
thread.init(allocatedContainer);
launcherMap.put(allocatedContainer.getId(), launcher);
launchThreads.add(thread);
thread.start();
}
}
return amResp;
}
/**
* コンテナーの完了状態をチェックする.
*
* @param amResp
* @param queue
*/
protected void checkComplete(AMResponse amResp,
Queue<ContainerLauncher> queue) {
List<ContainerStatus> completedContainers = amResp
.getCompletedContainersStatuses();
LOG.info("Got response from RM for container ask, completedCnt="
+ completedContainers.size());
for (ContainerStatus containerStatus : completedContainers) {
LOG.info("Got container status for containerID= "
+ containerStatus.getContainerId() + ", state="
+ containerStatus.getState() + ", exitStatus="
+ containerStatus.getExitStatus() + ", diagnostics="
+ containerStatus.getDiagnostics());
assert (containerStatus.getState() == ContainerState.COMPLETE);
// コンテナーの完了/失敗に応じてカウンターを増減させる
ExitStatusAction exitStatus = checkExitStatus(containerStatus);
switch (exitStatus) {
case SUCCEEDED:
numCompletedContainers.incrementAndGet();
LOG.info("Container completed successfully." + ", containerId="
+ containerStatus.getContainerId());
break;
case FAILED:
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
break;
case RETRY:
numAllocatedContainers.decrementAndGet();
ContainerLauncher launcher = launcherMap.get(containerStatus
.getContainerId());
if (launcher != null) {
queue.add(launcher);
}
break;
default:
throw new Error("exitStatus=" + exitStatus);
}
}
}
/**
* {@link LaunchThread}を生成する.
*/
public static interface ContainerLauncher {
public LaunchThread create();
}
/**
* コンテナーを起動するスレッド.
*/
public abstract class LaunchThread extends Thread {
/** アロケートされたコンテナー */
private Container container;
/** ContainerManager */
private ContainerManager cm;
/**
* 初期化.
*
* @param container
*/
public void init(Container container) {
this.container = container;
}
@Override
public void run() {
try {
LOG.info("Connecting to container manager for containerid="
+ container.getId());
this.cm = connectToCM();
ContainerLaunchContext ctx = getContainerLaunchContext();
startContainer(ctx);
} catch (Exception e) {
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
e.printStackTrace();
}
}
/**
* コンテナーマネージャーへ接続する.
*
* @return
*/
protected ContainerManager connectToCM() {
String cmIpPortStr = container.getNodeId().getHost() + ":"
+ container.getNodeId().getPort();
InetSocketAddress cmAddress = NetUtils
.createSocketAddr(cmIpPortStr);
LOG.info("Connecting to ResourceManager at " + cmIpPortStr);
return (ContainerManager) rpc.getProxy(ContainerManager.class,
cmAddress, conf);
}
/**
* コンテナーの実行情報を生成する.
*
* @return
* @throws IOException
*/
protected ContainerLaunchContext getContainerLaunchContext()
throws IOException {
ContainerLaunchContext ctx = Records
.newRecord(ContainerLaunchContext.class);
ctx.setContainerId(container.getId());
ctx.setResource(container.getResource());
try {
ctx.setUser(UserGroupInformation.getCurrentUser()
.getShortUserName());
} catch (IOException e) {
LOG.info("Getting current user info failed when trying to launch the container"
+ e.getMessage());
}
Map<String, LocalResource> localResources = getLocalResources();
ctx.setLocalResources(localResources);
Map<String, String> env = getEnvironment();
ctx.setEnvironment(env);
String command = getCommand();
List<String> commands = Collections.singletonList(command);
ctx.setCommands(commands);
return ctx;
}
/**
* コンテナー用のローカルリソースを指定する.
*
* @return
* @throws IOException
*/
protected Map<String, LocalResource> getLocalResources()
throws IOException {
return Collections.emptyMap();
}
/**
* コンテナー用の環境変数を指定する.
*
* @return
*/
protected Map<String, String> getEnvironment() {
return Collections.emptyMap();
}
/**
* コンテナープログラムを実行するコマンドを指定する.
*
* @return コマンド
*/
protected abstract String getCommand();
/**
* コンテナープログラムを実行開始する.
*
* @param ctx
* @throws YarnRemoteException
*/
protected void startContainer(ContainerLaunchContext ctx)
throws YarnRemoteException {
StartContainerRequest startReq = Records
.newRecord(StartContainerRequest.class);
startReq.setContainerLaunchContext(ctx);
cm.startContainer(startReq);
}
}
/**
* Javaプログラム用コンテナーを起動させるスレッド.
*/
public class JavaLaunchThread extends LaunchThread {
private Class<?> containerClass;
private File containerJar;
public JavaLaunchThread(Class<?> containerClass) {
this.containerClass = containerClass;
this.containerJar = new File(
ClassUtil.findContainingJar(containerClass));
}
@Override
protected Map<String, LocalResource> getLocalResources()
throws IOException {
Path jarPath = new Path(containerJar.toURI());
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
FileSystem fs = FileSystem.get(conf);
FileStatus destStatus = fs.getFileStatus(jarPath);
LocalResource cmJarRsrc = Records.newRecord(LocalResource.class);
cmJarRsrc.setType(LocalResourceType.FILE);
cmJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
cmJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
cmJarRsrc.setTimestamp(destStatus.getModificationTime());
cmJarRsrc.setSize(destStatus.getLen());
localResources.put("Container.jar", cmJarRsrc);
return localResources;
}
@Override
protected Map<String, String> getEnvironment() {
Map<String, String> env = new HashMap<String, String>();
String classPathEnv = "${CLASSPATH}" + ":./*" + ":$HADOOP_CONF_DIR"
+ ":$HADOOP_COMMON_HOME/share/hadoop/common/*"
+ ":$HADOOP_COMMON_HOME/share/hadoop/common/lib/*"
+ ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/*"
+ ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*"
+ ":$YARN_HOME/modules/*" + ":$YARN_HOME/lib/*"
+ ":./log4j.properties:";
env.put("CLASSPATH", classPathEnv);
return env;
}
@Override
protected String getCommand() {
StringBuilder sb = new StringBuilder(128);
sb.append("${JAVA_HOME}/bin/java ");
sb.append(containerClass.getName());
for (Object arg : getCommandArgs()) {
sb.append(" ");
sb.append(arg);
}
sb.append(" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/container.stdout.log");
sb.append(" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/container.stderr.log");
return sb.toString();
}
/**
* コンテナークラスのmain()に渡す引数を返す.
*
* @return 引数のリスト
*/
protected List<Object> getCommandArgs() {
return Arrays.<Object> asList();
}
}
/**
* コンテナーについて問い合わせる情報を生成する.
*
* @param numContainers
* コンテナー数
* @return
*/
protected ResourceRequest setupContainerAskForRM(int numContainers) {
ResourceRequest request = Records.newRecord(ResourceRequest.class);
request.setHostName("*");
request.setNumContainers(numContainers);
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(requestPriority);
request.setPriority(pri);
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(containerMemory);
request.setCapability(capability);
return request;
}
/**
* コンテナーのアロケートを行う.
*
* @param requestedContainers
* @return
* @throws YarnRemoteException
*/
protected AMResponse sendContainerAskToRM(
List<ResourceRequest> requestedContainers)
throws YarnRemoteException {
AllocateRequest req = Records.newRecord(AllocateRequest.class);
req.setResponseId(rmRequestID.incrementAndGet());
req.setApplicationAttemptId(appAttemptID);
req.addAllAsks(requestedContainers);
req.addAllReleases(releasedContainers);
req.setProgress((float) numCompletedContainers.get()
/ numTotalContainers);
LOG.info("Sending request to RM for containers" + ", requestedSet="
+ requestedContainers.size() + ", releasedSet="
+ releasedContainers.size() + ", progress=" + req.getProgress());
for (ResourceRequest rsrcReq : requestedContainers) {
LOG.info("Requested container ask: " + rsrcReq.toString());
}
for (ContainerId id : releasedContainers) {
LOG.info("Released container, id=" + id.getId());
}
AllocateResponse resp = resourceManager.allocate(req);
return resp.getAMResponse();
}
/**
* コンテナーの終了状態
*/
public static enum ExitStatusAction {
SUCCEEDED, FAILED, RETRY
}
/**
* コンテナーの終了状態を判定する.
*
* @param containerStatus
* @return
*/
protected ExitStatusAction checkExitStatus(ContainerStatus containerStatus) {
int exitStatus = containerStatus.getExitStatus();
switch (exitStatus) {
case 0:
return ExitStatusAction.SUCCEEDED;
case -100:
return ExitStatusAction.RETRY;
default:
return ExitStatusAction.FAILED;
}
}
/**
* クライアントに対して完了を通知する.
*
* @return 終了コード
* @throws YarnRemoteException
*/
protected int finish() throws YarnRemoteException {
LOG.info("Application completed. Signalling finish to RM");
FinishApplicationMasterRequest finishReq = Records
.newRecord(FinishApplicationMasterRequest.class);
finishReq.setAppAttemptId(appAttemptID);
int exitCode = initFinishRequest(finishReq);
resourceManager.finishApplicationMaster(finishReq);
return exitCode;
}
/**
* 完了通知用の初期化を行う.
*
* @param finishReq
* @return 終了コード
*/
protected int initFinishRequest(FinishApplicationMasterRequest finishReq) {
if (numFailedContainers.get() == 0) {
finishReq
.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
return 0;
} else {
finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
String diagnostics = "Diagnostics." + ", total="
+ numTotalContainers + ", completed="
+ numCompletedContainers.get() + ", allocated="
+ numAllocatedContainers.get() + ", failed="
+ numFailedContainers.get();
finishReq.setDiagnostics(diagnostics);
return 1;
}
}
}
package sample.yarn;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
/**
* Hadoop0.23.0 YARN Client.
* <p>
* →<a href=
* "http://hadoop.apache.org/common/docs/r0.23.0/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html"
* >参考</a><br>
* →<a href=
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn2.html"
* >説明</a>
* </p>
*
* @see org.apache.hadoop.yarn.applications.distributedshell.Client
* @author hishidama
*/
public abstract class YarnClient extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(YarnClient.class);
protected static int run(YarnClient client, String[] args) throws Exception {
return ToolRunner.run(client, args);
}
private YarnRPC rpc;
private ClientRMProtocol applicationsManager; // Resource Manager
/** アプリケーション名 */
protected String appName;
/** AppMstrのクラス名 */
protected Class<?> appMasterClass;
/** AppMstrが入っているjarファイルのパス */
protected File appMasterJar;
/** AppMstrの優先度 */
protected int amPriority = 0;
/** AppMstrのキュー */
protected String amQueue = "";
/** AppMstrを実行するユーザー */
protected String amUser = "";
/** AppMstrを実行するのに必要な(要求する)メモリーサイズ */
protected int amMemory = 1 * 1024;
/** 当Clientの開始時刻 */
private final long clientStartTime = System.currentTimeMillis();
/** AppMstrのタイムアウト時間(この時間を過ぎるとkillする) */
private long clientTimeout = TimeUnit.MINUTES.toMillis(10);
/**
* コンストラクター.
*
* @param appName
*
* @param appMasterClass
* AppMstrのクラス名
*/
protected YarnClient(String appName, Class<?> appMasterClass) {
this.appName = appName;
this.appMasterClass = appMasterClass;
this.appMasterJar = new File(
ClassUtil.findContainingJar(appMasterClass));
}
@Override
public int run(String[] args) throws Exception {
init(args);
Configuration conf = getConf();
rpc = YarnRPC.create(conf);
applicationsManager = connectToASM();
GetNewApplicationResponse newApp = getApplication();
ApplicationSubmissionContext appContext = getAppContext(newApp);
submit(appContext);
boolean succeeded = monitorApplication(newApp.getApplicationId());
return succeeded ? 0 : 1;
}
/**
* 初期化.
*
* @param args
* main()の引数
*/
protected void init(String[] args) {
// 各クラスでオーバーライドする
}
/**
* ApplicationsManager(AsM)へ接続する.
*
* @return
*/
protected ClientRMProtocol connectToASM() {
Configuration conf = getConf();
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS));
LOG.info("Connecting to ResourceManager at " + rmAddress);
return (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
rmAddress, conf);
}
/**
* ASMから新しいアプリケーションのApplicationIdを取得する.
* <p>
* responseには、使用できるリソース(使用可能メモリーの最小値・最大値)が返ってくる。
* </p>
*
* @return
* @throws YarnRemoteException
*/
protected GetNewApplicationResponse getApplication()
throws YarnRemoteException {
GetNewApplicationRequest request = Records
.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse response = applicationsManager
.getNewApplication(request);
LOG.info("Got new application id=" + response.getApplicationId());
return response;
}
/**
* AppMstrの実行情報を生成する.
*
* @param newApp
* @return
* @throws IOException
*/
protected ApplicationSubmissionContext getAppContext(
GetNewApplicationResponse newApp) throws IOException {
LOG.info("Setting up application submission context for ASM");
ApplicationSubmissionContext appContext = Records
.newRecord(ApplicationSubmissionContext.class);
ApplicationId appId = newApp.getApplicationId();
appContext.setApplicationId(appId);
appContext.setApplicationName(appName);
ContainerLaunchContext amContainer = getContainerLaunchContext(newApp,
appId);
appContext.setAMContainerSpec(amContainer);
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(amPriority);
appContext.setPriority(pri);
appContext.setQueue(amQueue);
appContext.setUser(amUser);
return appContext;
}
/**
* AppMstrの実行情報を生成する.
*
* @param newApp
* @param appId
* @return
* @throws IOException
*/
protected ContainerLaunchContext getContainerLaunchContext(
GetNewApplicationResponse newApp, ApplicationId appId)
throws IOException {
ContainerLaunchContext amContainer = Records
.newRecord(ContainerLaunchContext.class);
Map<String, LocalResource> localResources = getLocalResources();
amContainer.setLocalResources(localResources);
Map<String, String> env = getEnvironment();
amContainer.setEnvironment(env);
String command = getCommand();
List<String> commands = Collections.singletonList(command);
amContainer.setCommands(commands);
Resource capability = getCapability(newApp);
amContainer.setResource(capability);
return amContainer;
}
/**
* AppMstr用のローカルリソースを指定する.
*
* @return
* @throws IOException
*/
protected Map<String, LocalResource> getLocalResources() throws IOException {
Path jarPath = new Path(appMasterJar.toURI());
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
LOG.info("Copy App Master jar from local filesystem and add to local environment");
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
FileStatus destStatus = fs.getFileStatus(jarPath);
LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
amJarRsrc.setType(LocalResourceType.FILE);
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
amJarRsrc.setTimestamp(destStatus.getModificationTime());
amJarRsrc.setSize(destStatus.getLen());
localResources.put("AppMaster.jar", amJarRsrc);
return localResources;
}
/**
* AppMstr用の環境変数を指定する.
*
* @return
*/
protected Map<String, String> getEnvironment() {
Map<String, String> env = new HashMap<String, String>();
String classPathEnv = "${CLASSPATH}" + ":./*" + ":$HADOOP_CONF_DIR"
+ ":$HADOOP_COMMON_HOME/share/hadoop/common/*"
+ ":$HADOOP_COMMON_HOME/share/hadoop/common/lib/*"
+ ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/*"
+ ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*"
+ ":$YARN_HOME/modules/*" + ":$YARN_HOME/lib/*"
+ ":./log4j.properties:";
env.put("CLASSPATH", classPathEnv);
return env;
}
/**
* AppMstrを実行するコマンドを指定する.
*
* @return
*/
protected String getCommand() {
StringBuilder sb = new StringBuilder(128);
sb.append("${JAVA_HOME}/bin/java ");
sb.append(appMasterClass.getName());
for (Object arg : getCommandArgs()) {
sb.append(" ");
sb.append(arg);
}
sb.append(" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/stdout.log");
sb.append(" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/stderr.log");
return sb.toString();
}
/**
* AppMstrクラスのmain()に渡す引数を返す.
*
* @return 引数のリスト
*/
protected List<Object> getCommandArgs() {
return Collections.emptyList();
}
/**
* AppMstrに必要なリソースを指定する.
*
* @param newApp
* @return
*/
protected Resource getCapability(GetNewApplicationResponse newApp) {
{
int minMem = newApp.getMinimumResourceCapability().getMemory();
int maxMem = newApp.getMaximumResourceCapability().getMemory();
LOG.info("Min mem capabililty of resources in this cluster "
+ minMem);
LOG.info("Max mem capabililty of resources in this cluster "
+ maxMem);
if (amMemory < minMem) {
LOG.info("AM memory specified below min threshold of cluster. Using min value."
+ ", specified=" + amMemory + ", min=" + minMem);
amMemory = minMem;
} else if (amMemory > maxMem) {
LOG.info("AM memory specified above max threshold of cluster. Using max value."
+ ", specified=" + amMemory + ", max=" + maxMem);
amMemory = maxMem;
}
}
// Hadoop0.23.0では、指定できるリソースはメモリーサイズのみ
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(amMemory);
return capability;
}
/**
* AppMstrを実行開始する.
* <p>
* Resourece Managerに実行を依頼する。
* </p>
*
* @param appContext
* @throws YarnRemoteException
*/
protected void submit(ApplicationSubmissionContext appContext)
throws YarnRemoteException {
SubmitApplicationRequest appRequest = Records
.newRecord(SubmitApplicationRequest.class);
appRequest.setApplicationSubmissionContext(appContext);
LOG.info("Submitting application to ASM: " + appContext);
applicationsManager.submitApplication(appRequest);
}
/**
* AppMstrの実行状況を監視する.
*
* @param appId
* @return 正常に終了した場合、true
* @throws YarnRemoteException
*/
protected boolean monitorApplication(ApplicationId appId)
throws YarnRemoteException {
while (true) {
// 1秒間隔でチェック
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.debug("Thread sleep in monitoring loop interrupted");
}
// AppMstrの実行状況(レポート)を取得する
GetApplicationReportRequest reportRequest = Records
.newRecord(GetApplicationReportRequest.class);
reportRequest.setApplicationId(appId);
GetApplicationReportResponse reportResponse = applicationsManager
.getApplicationReport(reportRequest);
ApplicationReport report = reportResponse.getApplicationReport();
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report
.getFinalApplicationStatus();
LOG.info("appId:" + report.getApplicationId() + " state=" + state
+ ", dsStatus=" + dsStatus);
// System.out.println("report+++" + report);
switch (state) {
case FINISHED:
if (dsStatus == FinalApplicationStatus.SUCCEEDED) {
LOG.info("Application has completed successfully. Breaking monitoring loop");
return true;
} else {
LOG.info("Application did finished unsuccessfully."
+ " YarnState=" + state.toString()
+ ", DSFinalStatus=" + dsStatus.toString()
+ ". Breaking monitoring loop");
return false;
}
case KILLED:
case FAILED:
LOG.info("Application did not finish." + " YarnState="
+ state.toString() + ", DSFinalStatus="
+ dsStatus.toString() + ". Breaking monitoring loop");
return false;
}
if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
LOG.info("Reached client specified timeout for application. Killing application");
killApplication(appId);
return false;
}
}
}
protected void killApplication(ApplicationId appId)
throws YarnRemoteException {
KillApplicationRequest request = Records
.newRecord(KillApplicationRequest.class);
request.setApplicationId(appId);
applicationsManager.forceKillApplication(request);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment