Created
November 21, 2011 22:21
-
-
Save hishidama/1384151 to your computer and use it in GitHub Desktop.
Hadoop0.23 YARN 最小限サンプル
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample1; | |
import java.net.InetSocketAddress; | |
import java.util.Map; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.net.NetUtils; | |
import org.apache.hadoop.yarn.api.AMRMProtocol; | |
import org.apache.hadoop.yarn.api.ApplicationConstants; | |
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; | |
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; | |
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; | |
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; | |
import org.apache.hadoop.yarn.api.records.ContainerId; | |
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; | |
import org.apache.hadoop.yarn.conf.YarnConfiguration; | |
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; | |
import org.apache.hadoop.yarn.ipc.YarnRPC; | |
import org.apache.hadoop.yarn.util.ConverterUtils; | |
import org.apache.hadoop.yarn.util.Records; | |
/** | |
* Hadoop0.23.0 YARN ApplicationMaster最小限サンプル. | |
* <p> | |
* →<a href= | |
* "http://hadoop.apache.org/common/docs/r0.23.0/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html" | |
* >参考</a><br> | |
* →<a href= | |
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn.html" | |
* >説明</a> | |
* </p> | |
* | |
* @see org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster | |
* @author hishidama | |
*/ | |
public class Sample1AppMstr { | |
private static final Log LOG = LogFactory.getLog(Sample1AppMstr.class); | |
public static void main(String[] args) throws YarnRemoteException { | |
System.out.println("sample1-appMstr start"); | |
try { | |
for (String arg : args) { | |
System.err.println("arg=" + arg); | |
} | |
Sample1AppMstr app = new Sample1AppMstr(); | |
app.run(); | |
} finally { | |
System.out.println("sample1-appMstr end"); | |
} | |
} | |
private Configuration conf; | |
private YarnRPC rpc; | |
private ApplicationAttemptId appAttemptID; | |
private AMRMProtocol resourceManager; | |
/** コンストラクター */ | |
public Sample1AppMstr() { | |
conf = new Configuration(); | |
rpc = YarnRPC.create(conf); | |
} | |
/** | |
* 処理実行. | |
* | |
* @throws YarnRemoteException | |
*/ | |
public void run() throws YarnRemoteException { | |
LOG.info("Starting " + getClass().getSimpleName()); | |
appAttemptID = getAppAttemptID(); | |
resourceManager = connectToRM(); | |
registerToRM(); | |
finish(); | |
LOG.info("Finish " + getClass().getSimpleName()); | |
} | |
ApplicationAttemptId getAppAttemptID() { | |
Map<String, String> envs = System.getenv(); | |
String containerIdString = envs | |
.get(ApplicationConstants.AM_CONTAINER_ID_ENV); | |
if (containerIdString == null) { | |
throw new IllegalArgumentException( | |
"ContainerId not set in the environment"); | |
} | |
ContainerId containerId = ConverterUtils | |
.toContainerId(containerIdString); | |
ApplicationAttemptId appAttemptID = containerId | |
.getApplicationAttemptId(); | |
System.out.println("appAttemptID+++" + appAttemptID); | |
return appAttemptID; | |
} | |
AMRMProtocol connectToRM() { | |
YarnConfiguration yarnConf = new YarnConfiguration(conf); | |
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( | |
YarnConfiguration.RM_SCHEDULER_ADDRESS, | |
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); | |
LOG.info("Connecting to ResourceManager at " + rmAddress); | |
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf); | |
} | |
RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException { | |
RegisterApplicationMasterRequest appMasterRequest = Records | |
.newRecord(RegisterApplicationMasterRequest.class); | |
appMasterRequest.setApplicationAttemptId(appAttemptID); | |
// appMasterRequest.setHost(appMasterHostname); | |
// appMasterRequest.setRpcPort(appMasterRpcPort); | |
// appMasterRequest.setTrackingUrl(appMasterTrackingUrl); | |
return resourceManager.registerApplicationMaster(appMasterRequest); | |
} | |
void finish() throws YarnRemoteException { | |
LOG.info("Application completed. Signalling finish to RM"); | |
FinishApplicationMasterRequest finishReq = Records | |
.newRecord(FinishApplicationMasterRequest.class); | |
finishReq.setAppAttemptId(appAttemptID); | |
finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); | |
resourceManager.finishApplicationMaster(finishReq); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample1; | |
import java.io.File; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.FileStatus; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.net.NetUtils; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
import org.apache.hadoop.yarn.api.ApplicationConstants; | |
import org.apache.hadoop.yarn.api.ClientRMProtocol; | |
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; | |
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; | |
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; | |
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; | |
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; | |
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; | |
import org.apache.hadoop.yarn.api.records.ApplicationId; | |
import org.apache.hadoop.yarn.api.records.ApplicationReport; | |
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; | |
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; | |
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; | |
import org.apache.hadoop.yarn.api.records.LocalResource; | |
import org.apache.hadoop.yarn.api.records.LocalResourceType; | |
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; | |
import org.apache.hadoop.yarn.api.records.Priority; | |
import org.apache.hadoop.yarn.api.records.Resource; | |
import org.apache.hadoop.yarn.api.records.YarnApplicationState; | |
import org.apache.hadoop.yarn.conf.YarnConfiguration; | |
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; | |
import org.apache.hadoop.yarn.ipc.YarnRPC; | |
import org.apache.hadoop.yarn.util.ConverterUtils; | |
import org.apache.hadoop.yarn.util.Records; | |
/** | |
* Hadoop0.23.0 YARN Client最小限サンプル. | |
* <p> | |
* →<a href= | |
* "http://hadoop.apache.org/common/docs/r0.23.0/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html" | |
* >参考</a><br> | |
* →<a href= | |
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn.html" | |
* >説明</a> | |
* </p> | |
* | |
* @see org.apache.hadoop.yarn.applications.distributedshell.Client | |
* @author hishidama | |
*/ | |
public class Sample1Client extends Configured implements Tool { | |
private static final Log LOG = LogFactory.getLog(Sample1Client.class); | |
public static void main(String[] args) throws Exception { | |
int r = ToolRunner.run(new Sample1Client(), args); | |
System.exit(r); | |
} | |
private YarnRPC rpc; | |
private ClientRMProtocol applicationsManager; // Resource Manager | |
/** アプリケーション名 */ | |
private String appName = "sample1"; | |
/** AppMstrの優先度 */ | |
private int amPriority = 0; | |
/** AppMstrのキュー */ | |
private String amQueue = ""; | |
/** AppMstrを実行するユーザー */ | |
private String amUser = ""; | |
/** AppMstrを実行するのに必要な(要求する)メモリーサイズ */ | |
private int amMemory = 1 * 1024; | |
/** AppMstrが入っているjarファイルのパス */ | |
private String appMasterJar; | |
/** 当Clientの開始時刻 */ | |
private final long clientStartTime = System.currentTimeMillis(); | |
/** AppMstrのタイムアウト時間(この時間を過ぎるとkillする) */ | |
private long clientTimeout = TimeUnit.MINUTES.toMillis(10); | |
@Override | |
public int run(String[] args) throws Exception { | |
appMasterJar = args[0]; // 実行するAppMstrのjarファイルの場所 | |
Configuration conf = getConf(); | |
rpc = YarnRPC.create(conf); | |
applicationsManager = connectToASM(); | |
GetNewApplicationResponse newApp = getApplication(); | |
ApplicationSubmissionContext appContext = getAppContext(newApp); | |
submit(appContext); | |
boolean succeeded = monitorApplication(newApp.getApplicationId()); | |
return succeeded ? 0 : 1; | |
} | |
/** | |
* ApplicationsManager(AsM)へ接続する. | |
* | |
* @return | |
*/ | |
ClientRMProtocol connectToASM() { | |
Configuration conf = getConf(); | |
YarnConfiguration yarnConf = new YarnConfiguration(conf); | |
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( | |
YarnConfiguration.RM_ADDRESS, | |
YarnConfiguration.DEFAULT_RM_ADDRESS)); | |
LOG.info("Connecting to ResourceManager at " + rmAddress); | |
return (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, | |
rmAddress, conf); | |
} | |
/** | |
* ASMから新しいアプリケーションのApplicationIdを取得する. | |
* <p> | |
* responseには、使用できるリソース(使用可能メモリーの最小値・最大値)が返ってくる。 | |
* </p> | |
* | |
* @return | |
* @throws YarnRemoteException | |
*/ | |
GetNewApplicationResponse getApplication() throws YarnRemoteException { | |
GetNewApplicationRequest request = Records | |
.newRecord(GetNewApplicationRequest.class); | |
GetNewApplicationResponse response = applicationsManager | |
.getNewApplication(request); | |
LOG.info("Got new application id=" + response.getApplicationId()); | |
return response; | |
} | |
/** | |
* AppMstrの実行情報を生成する. | |
* | |
* @param newApp | |
* @return | |
* @throws IOException | |
*/ | |
ApplicationSubmissionContext getAppContext(GetNewApplicationResponse newApp) | |
throws IOException { | |
LOG.info("Setting up application submission context for ASM"); | |
ApplicationSubmissionContext appContext = Records | |
.newRecord(ApplicationSubmissionContext.class); | |
ApplicationId appId = newApp.getApplicationId(); | |
appContext.setApplicationId(appId); | |
appContext.setApplicationName(appName); | |
ContainerLaunchContext amContainer = getContainerLaunchContext(newApp, | |
appId); | |
appContext.setAMContainerSpec(amContainer); | |
Priority pri = Records.newRecord(Priority.class); | |
pri.setPriority(amPriority); | |
appContext.setPriority(pri); | |
appContext.setQueue(amQueue); | |
appContext.setUser(amUser); | |
return appContext; | |
} | |
/** | |
* AppMstrの情報を生成する. | |
* | |
* @param newApp | |
* @param appId | |
* @return | |
* @throws IOException | |
*/ | |
ContainerLaunchContext getContainerLaunchContext( | |
GetNewApplicationResponse newApp, ApplicationId appId) | |
throws IOException { | |
ContainerLaunchContext amContainer = Records | |
.newRecord(ContainerLaunchContext.class); | |
Map<String, LocalResource> localResources = getLocalResources(appId); | |
amContainer.setLocalResources(localResources); | |
Map<String, String> env = getEnvironment(); | |
amContainer.setEnvironment(env); | |
String command = getCommand(); | |
List<String> commands = Collections.singletonList(command); | |
amContainer.setCommands(commands); | |
Resource capability = getCapability(newApp); | |
amContainer.setResource(capability); | |
return amContainer; | |
} | |
/** | |
* AppMstr用のローカルリソースを指定する. | |
* | |
* @param appId | |
* @return | |
* @throws IOException | |
*/ | |
Map<String, LocalResource> getLocalResources(ApplicationId appId) | |
throws IOException { | |
Path jarPath = new Path(new File(appMasterJar).toURI()); | |
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); | |
LOG.info("Copy App Master jar from local filesystem and add to local environment"); | |
Configuration conf = getConf(); | |
FileSystem fs = FileSystem.get(conf); | |
FileStatus destStatus = fs.getFileStatus(jarPath); | |
LocalResource amJarRsrc = Records.newRecord(LocalResource.class); | |
amJarRsrc.setType(LocalResourceType.FILE); | |
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); | |
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); | |
amJarRsrc.setTimestamp(destStatus.getModificationTime()); | |
amJarRsrc.setSize(destStatus.getLen()); | |
localResources.put("AppMaster.jar", amJarRsrc); | |
return localResources; | |
} | |
/** | |
* AppMstr用の環境変数を指定する. | |
* | |
* @return | |
*/ | |
Map<String, String> getEnvironment() { | |
Map<String, String> env = new HashMap<String, String>(); | |
String classPathEnv = "${CLASSPATH}" + ":./*" + ":$HADOOP_CONF_DIR" | |
+ ":$HADOOP_COMMON_HOME/share/hadoop/common/*" | |
+ ":$HADOOP_COMMON_HOME/share/hadoop/common/lib/*" | |
+ ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/*" | |
+ ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*" | |
+ ":$YARN_HOME/modules/*" + ":$YARN_HOME/lib/*" | |
+ ":./log4j.properties:"; | |
env.put("CLASSPATH", classPathEnv); | |
return env; | |
} | |
/** | |
* AppMstrを実行するコマンドを指定する. | |
* | |
* @return | |
*/ | |
String getCommand() { | |
StringBuilder sb = new StringBuilder(128); | |
sb.append("${JAVA_HOME}/bin/java "); | |
sb.append(Sample1AppMstr.class.getName()); | |
sb.append(" arg1 arg2 arg3"); | |
sb.append(" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR | |
+ "/SampleAppMstr.stdout.log"); | |
sb.append(" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR | |
+ "/SampleAppMstr.stderr.log"); | |
return sb.toString(); | |
} | |
/** | |
* AppMstrに必要なリソースを指定する. | |
* | |
* @param newApp | |
* @return | |
*/ | |
Resource getCapability(GetNewApplicationResponse newApp) { | |
{ | |
int minMem = newApp.getMinimumResourceCapability().getMemory(); | |
int maxMem = newApp.getMaximumResourceCapability().getMemory(); | |
LOG.info("Min mem capabililty of resources in this cluster " | |
+ minMem); | |
LOG.info("Max mem capabililty of resources in this cluster " | |
+ maxMem); | |
if (amMemory < minMem) { | |
LOG.info("AM memory specified below min threshold of cluster. Using min value." | |
+ ", specified=" + amMemory + ", min=" + minMem); | |
amMemory = minMem; | |
} else if (amMemory > maxMem) { | |
LOG.info("AM memory specified above max threshold of cluster. Using max value." | |
+ ", specified=" + amMemory + ", max=" + maxMem); | |
amMemory = maxMem; | |
} | |
} | |
// Hadoop0.23.0では、指定できるリソースはメモリーサイズのみ | |
Resource capability = Records.newRecord(Resource.class); | |
capability.setMemory(amMemory); | |
return capability; | |
} | |
/** | |
* AppMstrを実行開始する. | |
* <p> | |
* Resourece Managerに実行を依頼する。 | |
* </p> | |
* | |
* @param appContext | |
* @throws YarnRemoteException | |
*/ | |
void submit(ApplicationSubmissionContext appContext) | |
throws YarnRemoteException { | |
SubmitApplicationRequest appRequest = Records | |
.newRecord(SubmitApplicationRequest.class); | |
appRequest.setApplicationSubmissionContext(appContext); | |
LOG.info("Submitting application to ASM"); | |
applicationsManager.submitApplication(appRequest); | |
} | |
/** | |
* AppMstrの実行状況を監視する. | |
* | |
* @param appId | |
* @return 正常に終了した場合、true | |
* @throws YarnRemoteException | |
*/ | |
boolean monitorApplication(ApplicationId appId) throws YarnRemoteException { | |
while (true) { | |
// 1秒間隔でチェック | |
try { | |
Thread.sleep(1000); | |
} catch (InterruptedException e) { | |
LOG.debug("Thread sleep in monitoring loop interrupted"); | |
} | |
// AppMstrの実行状況(レポート)を取得する | |
GetApplicationReportRequest reportRequest = Records | |
.newRecord(GetApplicationReportRequest.class); | |
reportRequest.setApplicationId(appId); | |
GetApplicationReportResponse reportResponse = applicationsManager | |
.getApplicationReport(reportRequest); | |
ApplicationReport report = reportResponse.getApplicationReport(); | |
YarnApplicationState state = report.getYarnApplicationState(); | |
FinalApplicationStatus dsStatus = report | |
.getFinalApplicationStatus(); | |
System.out.println("report+++" + report); | |
switch (state) { | |
case FINISHED: | |
if (dsStatus == FinalApplicationStatus.SUCCEEDED) { | |
LOG.info("Application has completed successfully. Breaking monitoring loop"); | |
return true; | |
} else { | |
LOG.info("Application did finished unsuccessfully." | |
+ " YarnState=" + state.toString() | |
+ ", DSFinalStatus=" + dsStatus.toString() | |
+ ". Breaking monitoring loop"); | |
return false; | |
} | |
case KILLED: | |
case FAILED: | |
LOG.info("Application did not finish." + " YarnState=" | |
+ state.toString() + ", DSFinalStatus=" | |
+ dsStatus.toString() + ". Breaking monitoring loop"); | |
return false; | |
} | |
if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { | |
LOG.info("Reached client specified timeout for application. Killing application"); | |
killApplication(appId); | |
return false; | |
} | |
} | |
} | |
void killApplication(ApplicationId appId) throws YarnRemoteException { | |
KillApplicationRequest request = Records | |
.newRecord(KillApplicationRequest.class); | |
request.setApplicationId(appId); | |
applicationsManager.forceKillApplication(request); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment