Skip to content

Instantly share code, notes, and snippets.

@hishidama
Created November 21, 2011 22:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hishidama/1384151 to your computer and use it in GitHub Desktop.
Save hishidama/1384151 to your computer and use it in GitHub Desktop.
Hadoop0.23 YARN 最小限サンプル
package sample1;
import java.net.InetSocketAddress;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
/**
* Hadoop0.23.0 YARN ApplicationMaster最小限サンプル.
* <p>
* →<a href=
* "http://hadoop.apache.org/common/docs/r0.23.0/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html"
* >参考</a><br>
* →<a href=
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn.html"
* >説明</a>
* </p>
*
* @see org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster
* @author hishidama
*/
public class Sample1AppMstr {
private static final Log LOG = LogFactory.getLog(Sample1AppMstr.class);
public static void main(String[] args) throws YarnRemoteException {
System.out.println("sample1-appMstr start");
try {
for (String arg : args) {
System.err.println("arg=" + arg);
}
Sample1AppMstr app = new Sample1AppMstr();
app.run();
} finally {
System.out.println("sample1-appMstr end");
}
}
private Configuration conf;
private YarnRPC rpc;
private ApplicationAttemptId appAttemptID;
private AMRMProtocol resourceManager;
/** コンストラクター */
public Sample1AppMstr() {
conf = new Configuration();
rpc = YarnRPC.create(conf);
}
/**
* 処理実行.
*
* @throws YarnRemoteException
*/
public void run() throws YarnRemoteException {
LOG.info("Starting " + getClass().getSimpleName());
appAttemptID = getAppAttemptID();
resourceManager = connectToRM();
registerToRM();
finish();
LOG.info("Finish " + getClass().getSimpleName());
}
ApplicationAttemptId getAppAttemptID() {
Map<String, String> envs = System.getenv();
String containerIdString = envs
.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
if (containerIdString == null) {
throw new IllegalArgumentException(
"ContainerId not set in the environment");
}
ContainerId containerId = ConverterUtils
.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId
.getApplicationAttemptId();
System.out.println("appAttemptID+++" + appAttemptID);
return appAttemptID;
}
AMRMProtocol connectToRM() {
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
LOG.info("Connecting to ResourceManager at " + rmAddress);
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf);
}
RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {
RegisterApplicationMasterRequest appMasterRequest = Records
.newRecord(RegisterApplicationMasterRequest.class);
appMasterRequest.setApplicationAttemptId(appAttemptID);
// appMasterRequest.setHost(appMasterHostname);
// appMasterRequest.setRpcPort(appMasterRpcPort);
// appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
return resourceManager.registerApplicationMaster(appMasterRequest);
}
void finish() throws YarnRemoteException {
LOG.info("Application completed. Signalling finish to RM");
FinishApplicationMasterRequest finishReq = Records
.newRecord(FinishApplicationMasterRequest.class);
finishReq.setAppAttemptId(appAttemptID);
finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
resourceManager.finishApplicationMaster(finishReq);
}
}
package sample1;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
/**
* Hadoop0.23.0 YARN Client最小限サンプル.
* <p>
* →<a href=
* "http://hadoop.apache.org/common/docs/r0.23.0/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html"
* >参考</a><br>
* →<a href=
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn.html"
* >説明</a>
* </p>
*
* @see org.apache.hadoop.yarn.applications.distributedshell.Client
* @author hishidama
*/
public class Sample1Client extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(Sample1Client.class);
public static void main(String[] args) throws Exception {
int r = ToolRunner.run(new Sample1Client(), args);
System.exit(r);
}
private YarnRPC rpc;
private ClientRMProtocol applicationsManager; // Resource Manager
/** アプリケーション名 */
private String appName = "sample1";
/** AppMstrの優先度 */
private int amPriority = 0;
/** AppMstrのキュー */
private String amQueue = "";
/** AppMstrを実行するユーザー */
private String amUser = "";
/** AppMstrを実行するのに必要な(要求する)メモリーサイズ */
private int amMemory = 1 * 1024;
/** AppMstrが入っているjarファイルのパス */
private String appMasterJar;
/** 当Clientの開始時刻 */
private final long clientStartTime = System.currentTimeMillis();
/** AppMstrのタイムアウト時間(この時間を過ぎるとkillする) */
private long clientTimeout = TimeUnit.MINUTES.toMillis(10);
@Override
public int run(String[] args) throws Exception {
appMasterJar = args[0]; // 実行するAppMstrのjarファイルの場所
Configuration conf = getConf();
rpc = YarnRPC.create(conf);
applicationsManager = connectToASM();
GetNewApplicationResponse newApp = getApplication();
ApplicationSubmissionContext appContext = getAppContext(newApp);
submit(appContext);
boolean succeeded = monitorApplication(newApp.getApplicationId());
return succeeded ? 0 : 1;
}
/**
* ApplicationsManager(AsM)へ接続する.
*
* @return
*/
ClientRMProtocol connectToASM() {
Configuration conf = getConf();
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS));
LOG.info("Connecting to ResourceManager at " + rmAddress);
return (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
rmAddress, conf);
}
/**
* ASMから新しいアプリケーションのApplicationIdを取得する.
* <p>
* responseには、使用できるリソース(使用可能メモリーの最小値・最大値)が返ってくる。
* </p>
*
* @return
* @throws YarnRemoteException
*/
GetNewApplicationResponse getApplication() throws YarnRemoteException {
GetNewApplicationRequest request = Records
.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse response = applicationsManager
.getNewApplication(request);
LOG.info("Got new application id=" + response.getApplicationId());
return response;
}
/**
* AppMstrの実行情報を生成する.
*
* @param newApp
* @return
* @throws IOException
*/
ApplicationSubmissionContext getAppContext(GetNewApplicationResponse newApp)
throws IOException {
LOG.info("Setting up application submission context for ASM");
ApplicationSubmissionContext appContext = Records
.newRecord(ApplicationSubmissionContext.class);
ApplicationId appId = newApp.getApplicationId();
appContext.setApplicationId(appId);
appContext.setApplicationName(appName);
ContainerLaunchContext amContainer = getContainerLaunchContext(newApp,
appId);
appContext.setAMContainerSpec(amContainer);
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(amPriority);
appContext.setPriority(pri);
appContext.setQueue(amQueue);
appContext.setUser(amUser);
return appContext;
}
/**
* AppMstrの情報を生成する.
*
* @param newApp
* @param appId
* @return
* @throws IOException
*/
ContainerLaunchContext getContainerLaunchContext(
GetNewApplicationResponse newApp, ApplicationId appId)
throws IOException {
ContainerLaunchContext amContainer = Records
.newRecord(ContainerLaunchContext.class);
Map<String, LocalResource> localResources = getLocalResources(appId);
amContainer.setLocalResources(localResources);
Map<String, String> env = getEnvironment();
amContainer.setEnvironment(env);
String command = getCommand();
List<String> commands = Collections.singletonList(command);
amContainer.setCommands(commands);
Resource capability = getCapability(newApp);
amContainer.setResource(capability);
return amContainer;
}
/**
* AppMstr用のローカルリソースを指定する.
*
* @param appId
* @return
* @throws IOException
*/
Map<String, LocalResource> getLocalResources(ApplicationId appId)
throws IOException {
Path jarPath = new Path(new File(appMasterJar).toURI());
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
LOG.info("Copy App Master jar from local filesystem and add to local environment");
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
FileStatus destStatus = fs.getFileStatus(jarPath);
LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
amJarRsrc.setType(LocalResourceType.FILE);
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
amJarRsrc.setTimestamp(destStatus.getModificationTime());
amJarRsrc.setSize(destStatus.getLen());
localResources.put("AppMaster.jar", amJarRsrc);
return localResources;
}
/**
* AppMstr用の環境変数を指定する.
*
* @return
*/
Map<String, String> getEnvironment() {
Map<String, String> env = new HashMap<String, String>();
String classPathEnv = "${CLASSPATH}" + ":./*" + ":$HADOOP_CONF_DIR"
+ ":$HADOOP_COMMON_HOME/share/hadoop/common/*"
+ ":$HADOOP_COMMON_HOME/share/hadoop/common/lib/*"
+ ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/*"
+ ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*"
+ ":$YARN_HOME/modules/*" + ":$YARN_HOME/lib/*"
+ ":./log4j.properties:";
env.put("CLASSPATH", classPathEnv);
return env;
}
/**
* AppMstrを実行するコマンドを指定する.
*
* @return
*/
String getCommand() {
StringBuilder sb = new StringBuilder(128);
sb.append("${JAVA_HOME}/bin/java ");
sb.append(Sample1AppMstr.class.getName());
sb.append(" arg1 arg2 arg3");
sb.append(" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/SampleAppMstr.stdout.log");
sb.append(" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ "/SampleAppMstr.stderr.log");
return sb.toString();
}
/**
* AppMstrに必要なリソースを指定する.
*
* @param newApp
* @return
*/
Resource getCapability(GetNewApplicationResponse newApp) {
{
int minMem = newApp.getMinimumResourceCapability().getMemory();
int maxMem = newApp.getMaximumResourceCapability().getMemory();
LOG.info("Min mem capabililty of resources in this cluster "
+ minMem);
LOG.info("Max mem capabililty of resources in this cluster "
+ maxMem);
if (amMemory < minMem) {
LOG.info("AM memory specified below min threshold of cluster. Using min value."
+ ", specified=" + amMemory + ", min=" + minMem);
amMemory = minMem;
} else if (amMemory > maxMem) {
LOG.info("AM memory specified above max threshold of cluster. Using max value."
+ ", specified=" + amMemory + ", max=" + maxMem);
amMemory = maxMem;
}
}
// Hadoop0.23.0では、指定できるリソースはメモリーサイズのみ
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(amMemory);
return capability;
}
/**
* AppMstrを実行開始する.
* <p>
* Resourece Managerに実行を依頼する。
* </p>
*
* @param appContext
* @throws YarnRemoteException
*/
void submit(ApplicationSubmissionContext appContext)
throws YarnRemoteException {
SubmitApplicationRequest appRequest = Records
.newRecord(SubmitApplicationRequest.class);
appRequest.setApplicationSubmissionContext(appContext);
LOG.info("Submitting application to ASM");
applicationsManager.submitApplication(appRequest);
}
/**
* AppMstrの実行状況を監視する.
*
* @param appId
* @return 正常に終了した場合、true
* @throws YarnRemoteException
*/
boolean monitorApplication(ApplicationId appId) throws YarnRemoteException {
while (true) {
// 1秒間隔でチェック
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.debug("Thread sleep in monitoring loop interrupted");
}
// AppMstrの実行状況(レポート)を取得する
GetApplicationReportRequest reportRequest = Records
.newRecord(GetApplicationReportRequest.class);
reportRequest.setApplicationId(appId);
GetApplicationReportResponse reportResponse = applicationsManager
.getApplicationReport(reportRequest);
ApplicationReport report = reportResponse.getApplicationReport();
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report
.getFinalApplicationStatus();
System.out.println("report+++" + report);
switch (state) {
case FINISHED:
if (dsStatus == FinalApplicationStatus.SUCCEEDED) {
LOG.info("Application has completed successfully. Breaking monitoring loop");
return true;
} else {
LOG.info("Application did finished unsuccessfully."
+ " YarnState=" + state.toString()
+ ", DSFinalStatus=" + dsStatus.toString()
+ ". Breaking monitoring loop");
return false;
}
case KILLED:
case FAILED:
LOG.info("Application did not finish." + " YarnState="
+ state.toString() + ", DSFinalStatus="
+ dsStatus.toString() + ". Breaking monitoring loop");
return false;
}
if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
LOG.info("Reached client specified timeout for application. Killing application");
killApplication(appId);
return false;
}
}
}
void killApplication(ApplicationId appId) throws YarnRemoteException {
KillApplicationRequest request = Records
.newRecord(KillApplicationRequest.class);
request.setApplicationId(appId);
applicationsManager.forceKillApplication(request);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment