Created
November 27, 2011 03:45
-
-
Save hishidama/1396921 to your computer and use it in GitHub Desktop.
Hadoop0.23 YARN 円周率算出サンプル(中途半端版)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample.yarn; | |
import java.io.IOException; | |
import java.net.URL; | |
import java.net.URLDecoder; | |
import java.util.Enumeration; | |
/** | |
* クラスのユーティリティー. | |
* <p> | |
* →<a href= | |
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn2.html" | |
* >説明</a> | |
* </p> | |
* | |
* @author hishidama | |
*/ | |
public class ClassUtil { | |
/** | |
* @see org.apache.hadoop.mapred.JobConf#findContainingJar() | |
*/ | |
public static String findContainingJar(Class<?> my_class) { | |
ClassLoader loader = my_class.getClassLoader(); | |
String class_file = my_class.getName().replaceAll("\\.", "/") | |
+ ".class"; | |
try { | |
for (Enumeration<URL> itr = loader.getResources(class_file); itr | |
.hasMoreElements();) { | |
URL url = itr.nextElement(); | |
if ("jar".equals(url.getProtocol())) { | |
String toReturn = url.getPath(); | |
if (toReturn.startsWith("file:")) { | |
toReturn = toReturn.substring("file:".length()); | |
} | |
toReturn = toReturn.replaceAll("\\+", "%2B"); | |
toReturn = URLDecoder.decode(toReturn, "UTF-8"); | |
return toReturn.replaceAll("!.*$", ""); | |
} | |
} | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
return null; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample.yarn.sample2; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.Queue; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import sample.yarn.YarnAppMstr; | |
/** | |
* Hadoop0.23.0 YARN 円周率算出サンプル(中途半端版)アプリケーションマスター. | |
* <p> | |
* →<a href= | |
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn2.html" | |
* >説明</a> | |
* </p> | |
* | |
* @author hishidama | |
*/ | |
public class Sample2AppMstr extends YarnAppMstr { | |
public static void main(String[] args) throws Exception { | |
int r = run(new Sample2AppMstr(), args); | |
System.exit(r); | |
} | |
/** 起動するコンテナー数 */ | |
private int nContainers; | |
/** 演算(サンプリング)する回数 */ | |
private long nSamples; | |
@Override | |
protected void init(String[] args) { | |
if (args.length != 2) { | |
System.err.println("Usage: " + getClass().getName() | |
+ " <nContainers> <nSamples>"); | |
throw new IllegalArgumentException("args.length=" + args.length); | |
} | |
nContainers = Integer.parseInt(args[0]); | |
nSamples = Long.parseLong(args[1]); | |
} | |
@Override | |
protected Queue<ContainerLauncher> getContainerQueue() { | |
Queue<ContainerLauncher> queue = new ArrayBlockingQueue<ContainerLauncher>( | |
nContainers); | |
for (int i = 0; i < nContainers; i++) { | |
Sample2Launcher launcher = new Sample2Launcher(i * nSamples, | |
nSamples); | |
queue.add(launcher); | |
} | |
return queue; | |
} | |
class Sample2Launcher implements ContainerLauncher { | |
private long offset; | |
private long samples; | |
public Sample2Launcher(long offset, long samples) { | |
this.offset = offset; | |
this.samples = samples; | |
} | |
@Override | |
public LaunchThread create() { | |
return new Sample2LaunchThread(); | |
} | |
class Sample2LaunchThread extends JavaLaunchThread { | |
public Sample2LaunchThread() { | |
super(Sample2Container.class); | |
} | |
@Override | |
protected List<Object> getCommandArgs() { | |
return Arrays.<Object> asList(offset, samples); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample.yarn.sample2; | |
import java.util.Arrays; | |
import java.util.List; | |
import sample.yarn.YarnClient; | |
/** | |
* Hadoop0.23.0 YARN 円周率算出サンプル(中途半端版)クライアント. | |
* <p> | |
* →<a href= | |
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn2.html" | |
* >説明</a> | |
* </p> | |
* | |
* @author hishidama | |
*/ | |
public class Sample2Client extends YarnClient { | |
public static void main(String[] args) throws Exception { | |
int r = run(new Sample2Client(), args); | |
System.exit(r); | |
} | |
/** 起動するコンテナー数 */ | |
private int nContainers; | |
/** 演算(サンプリング)する回数 */ | |
private long nSamples; | |
/** コンストラクター */ | |
public Sample2Client() { | |
super("sample2", Sample2AppMstr.class); | |
} | |
@Override | |
protected void init(String[] args) { | |
if (args.length != 2) { | |
System.err.println("Usage: " + getClass().getName() | |
+ " <nContainers> <nSamples>"); | |
throw new IllegalArgumentException("args.length=" + args.length); | |
} | |
nContainers = Integer.parseInt(args[0]); | |
nSamples = Long.parseLong(args[1]); | |
} | |
@Override | |
protected List<Object> getCommandArgs() { | |
return Arrays.<Object> asList(nContainers, nSamples); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample.yarn.sample2; | |
/** | |
* Hadoop0.23.0 YARN 円周率算出サンプル(中途半端版)コンテナー. | |
* <p> | |
* モンテカルロ法を用いて円周率計算の為の数値算出を行うが、結果を返さず、ログに出力するのみ。 | |
* </p> | |
* <p> | |
* →<a href= | |
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn2.html" | |
* >説明</a> | |
* </p> | |
* | |
* @see org.apache.hadoop.examples.QuasiMonteCarlo | |
* @author hishidama | |
*/ | |
public class Sample2Container { | |
public static void main(String[] args) { | |
Sample2Container container = new Sample2Container(); | |
container.init(args); | |
int r = container.run(); | |
System.exit(r); | |
} | |
private long offset; | |
private long samples; | |
public void init(String[] args) { | |
offset = Long.parseLong(args[0]); | |
samples = Long.parseLong(args[1]); | |
System.out.println("offset= " + offset); | |
System.out.println("samples=" + samples); | |
} | |
public int run() { | |
final HaltonSequence haltonsequence = new HaltonSequence(offset); | |
long numInside = 0L; | |
long numOutside = 0L; | |
for (long i = 0; i < samples;) { | |
// generate points in a unit square | |
final double[] point = haltonsequence.nextPoint(); | |
// count points inside/outside of the inscribed circle of the square | |
final double x = point[0] - 0.5; | |
final double y = point[1] - 0.5; | |
if (x * x + y * y > 0.25) { | |
numOutside++; | |
} else { | |
numInside++; | |
} | |
// report status | |
i++; | |
if (i % 1000 == 0) { | |
System.err.println("Generated " + i + " samples."); | |
} | |
} | |
// 本来はHDFSに出力したりするんだろうけど、今は表示するだけ | |
System.out.println("numInside= " + numInside); | |
System.out.println("numOutside=" + numOutside); | |
return 0; | |
} | |
/** | |
* @see org.apache.hadoop.examples.QuasiMonteCarlo.HaltonSequence | |
*/ | |
private static class HaltonSequence { | |
/** Bases */ | |
static final int[] P = { 2, 3 }; | |
/** Maximum number of digits allowed */ | |
static final int[] K = { 63, 40 }; | |
private long index; | |
private double[] x; | |
private double[][] q; | |
private int[][] d; | |
/** | |
* Initialize to H(startindex), so the sequence begins with | |
* H(startindex+1). | |
*/ | |
HaltonSequence(long startindex) { | |
index = startindex; | |
x = new double[K.length]; | |
q = new double[K.length][]; | |
d = new int[K.length][]; | |
for (int i = 0; i < K.length; i++) { | |
q[i] = new double[K[i]]; | |
d[i] = new int[K[i]]; | |
} | |
for (int i = 0; i < K.length; i++) { | |
long k = index; | |
x[i] = 0; | |
for (int j = 0; j < K[i]; j++) { | |
q[i][j] = (j == 0 ? 1.0 : q[i][j - 1]) / P[i]; | |
d[i][j] = (int) (k % P[i]); | |
k = (k - d[i][j]) / P[i]; | |
x[i] += d[i][j] * q[i][j]; | |
} | |
} | |
} | |
/** | |
* Compute next point. Assume the current point is H(index). Compute | |
* H(index+1). | |
* | |
* @return a 2-dimensional point with coordinates in [0,1)^2 | |
*/ | |
double[] nextPoint() { | |
index++; | |
for (int i = 0; i < K.length; i++) { | |
for (int j = 0; j < K[i]; j++) { | |
d[i][j]++; | |
x[i] += q[i][j]; | |
if (d[i][j] < P[i]) { | |
break; | |
} | |
d[i][j] = 0; | |
x[i] -= (j == 0 ? 1.0 : q[i][j - 1]); | |
} | |
} | |
return x; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample.yarn; | |
import java.io.File; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Queue; | |
import java.util.concurrent.CopyOnWriteArrayList; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileStatus; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.net.NetUtils; | |
import org.apache.hadoop.security.UserGroupInformation; | |
import org.apache.hadoop.yarn.api.AMRMProtocol; | |
import org.apache.hadoop.yarn.api.ApplicationConstants; | |
import org.apache.hadoop.yarn.api.ContainerManager; | |
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; | |
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; | |
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; | |
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; | |
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; | |
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; | |
import org.apache.hadoop.yarn.api.records.AMResponse; | |
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; | |
import org.apache.hadoop.yarn.api.records.Container; | |
import org.apache.hadoop.yarn.api.records.ContainerId; | |
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; | |
import org.apache.hadoop.yarn.api.records.ContainerState; | |
import org.apache.hadoop.yarn.api.records.ContainerStatus; | |
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; | |
import org.apache.hadoop.yarn.api.records.LocalResource; | |
import org.apache.hadoop.yarn.api.records.LocalResourceType; | |
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; | |
import org.apache.hadoop.yarn.api.records.Priority; | |
import org.apache.hadoop.yarn.api.records.Resource; | |
import org.apache.hadoop.yarn.api.records.ResourceRequest; | |
import org.apache.hadoop.yarn.conf.YarnConfiguration; | |
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; | |
import org.apache.hadoop.yarn.ipc.YarnRPC; | |
import org.apache.hadoop.yarn.util.ConverterUtils; | |
import org.apache.hadoop.yarn.util.Records; | |
/** | |
* Hadoop0.23.0 YARN ApplicationMaster. | |
* <p> | |
* →<a href= | |
* "http://hadoop.apache.org/common/docs/r0.23.0/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html" | |
* >参考</a><br> | |
* →<a href= | |
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn2.html" | |
* >説明</a> | |
* </p> | |
* | |
* @see org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster | |
* @author hishidama | |
*/ | |
public abstract class YarnAppMstr { | |
private static final Log LOG = LogFactory.getLog(YarnAppMstr.class); | |
/** | |
* 処理実行. | |
* | |
* @param app | |
* 実行対象 | |
* @param args | |
* main()の引数 | |
* @return 終了コード | |
* @throws Exception | |
*/ | |
protected static int run(YarnAppMstr app, String[] args) throws Exception { | |
return app.run(args); | |
} | |
protected Configuration conf; | |
protected YarnRPC rpc; | |
protected ApplicationAttemptId appAttemptID; | |
protected AMRMProtocol resourceManager; | |
/** コンテナーの優先度 */ | |
private int requestPriority = 0; | |
/** コンテナーを実行するのに必要な(要求する)メモリーサイズ */ | |
protected int containerMemory = 1 * 1024; | |
/** 起動するコンテナー数 */ | |
private int numTotalContainers; | |
/** 完了したコンテナー数 */ | |
private AtomicInteger numCompletedContainers = new AtomicInteger(); | |
/** アロケートされたコンテナー数 */ | |
private AtomicInteger numAllocatedContainers = new AtomicInteger(); | |
/** 失敗したコンテナー数 */ | |
private AtomicInteger numFailedContainers = new AtomicInteger(); | |
/** リソースマネージャーへのRPC呼び出し毎に増加させるカウンター */ | |
private AtomicInteger rmRequestID = new AtomicInteger(); | |
/** リリースされるコンテナー */ | |
private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>(); | |
/** コンテナーID毎の起動クラス */ | |
private Map<ContainerId, ContainerLauncher> launcherMap = new HashMap<ContainerId, ContainerLauncher>(); | |
/** 起動したスレッド */ | |
private List<Thread> launchThreads = new ArrayList<Thread>(); | |
/** コンストラクター */ | |
protected YarnAppMstr() { | |
conf = new Configuration(); | |
rpc = YarnRPC.create(conf); | |
} | |
/** | |
* 処理実行. | |
* | |
* @param args | |
* main()の引数 | |
* @throws YarnRemoteException | |
*/ | |
public int run(String[] args) throws YarnRemoteException { | |
init(args); | |
appAttemptID = getAppAttemptID(); | |
resourceManager = connectToRM(); | |
RegisterApplicationMasterResponse amResponse = registerToRM(); | |
initResource(amResponse); | |
loop(); | |
finish(); | |
return 0; | |
} | |
/** | |
* 初期化. | |
* | |
* @param args | |
* main()の引数 | |
*/ | |
protected void init(String[] args) { | |
// 各クラスでオーバーライドする | |
} | |
/** | |
* appAttemptIDを取得する. | |
* | |
* @return | |
*/ | |
protected ApplicationAttemptId getAppAttemptID() { | |
Map<String, String> envs = System.getenv(); | |
String containerIdString = envs | |
.get(ApplicationConstants.AM_CONTAINER_ID_ENV); | |
if (containerIdString == null) { | |
throw new IllegalArgumentException( | |
"ContainerId not set in the environment"); | |
} | |
ContainerId containerId = ConverterUtils | |
.toContainerId(containerIdString); | |
ApplicationAttemptId appAttemptID = containerId | |
.getApplicationAttemptId(); | |
return appAttemptID; | |
} | |
/** | |
* リソースマネージャーへ接続する. | |
* | |
* @return | |
*/ | |
protected AMRMProtocol connectToRM() { | |
YarnConfiguration yarnConf = new YarnConfiguration(conf); | |
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( | |
YarnConfiguration.RM_SCHEDULER_ADDRESS, | |
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)); | |
LOG.info("Connecting to ResourceManager at " + rmAddress); | |
return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf); | |
} | |
/** | |
* リソースマネージャーへ自分を登録する. | |
* | |
* @return | |
* @throws YarnRemoteException | |
*/ | |
protected RegisterApplicationMasterResponse registerToRM() | |
throws YarnRemoteException { | |
RegisterApplicationMasterRequest appMasterRequest = Records | |
.newRecord(RegisterApplicationMasterRequest.class); | |
appMasterRequest.setApplicationAttemptId(appAttemptID); | |
// appMasterRequest.setHost(appMasterHostname); | |
// appMasterRequest.setRpcPort(appMasterRpcPort); | |
// appMasterRequest.setTrackingUrl(appMasterTrackingUrl); | |
return resourceManager.registerApplicationMaster(appMasterRequest); | |
} | |
/** | |
* コンテナーのリソースの値を初期化する. | |
* | |
* @param response | |
*/ | |
protected void initResource(RegisterApplicationMasterResponse response) { | |
int minMem = response.getMinimumResourceCapability().getMemory(); | |
int maxMem = response.getMaximumResourceCapability().getMemory(); | |
LOG.info("Min mem capabililty of resources in this cluster " + minMem); | |
LOG.info("Max mem capabililty of resources in this cluster " + maxMem); | |
if (containerMemory < minMem) { | |
LOG.info("Container memory specified below min threshold of cluster. Using min value." | |
+ ", specified=" + containerMemory + ", min=" + minMem); | |
containerMemory = minMem; | |
} else if (containerMemory > maxMem) { | |
LOG.info("Container memory specified above max threshold of cluster. Using max value." | |
+ ", specified=" + containerMemory + ", max=" + maxMem); | |
containerMemory = maxMem; | |
} | |
} | |
/** | |
* コンテナー起動・監視ループ. | |
* | |
* @throws YarnRemoteException | |
*/ | |
protected void loop() throws YarnRemoteException { | |
Queue<ContainerLauncher> queue = getContainerQueue(); | |
numTotalContainers = queue.size(); | |
while (numCompletedContainers.get() < numTotalContainers) { | |
AMResponse amResp = kickContainer(queue); | |
Resource availableResources = amResp.getAvailableResources(); | |
LOG.info("Current available resources in the cluster " | |
+ availableResources); | |
checkComplete(amResp, queue); | |
if (numCompletedContainers.get() == numTotalContainers) { | |
break; // ループ終了 | |
} | |
try { | |
Thread.sleep(1000); | |
} catch (InterruptedException e) { | |
LOG.debug("Thread sleep in monitoring loop interrupted"); | |
} | |
} | |
// スレッドの終了待ち | |
for (Thread launchThread : launchThreads) { | |
try { | |
launchThread.join(10 * 1000); | |
} catch (InterruptedException e) { | |
LOG.info("Exception thrown in thread join: " + e.getMessage()); | |
e.printStackTrace(); | |
} | |
} | |
} | |
/** | |
* コンテナー起動情報を生成する. | |
* | |
* @return | |
*/ | |
protected abstract Queue<ContainerLauncher> getContainerQueue(); | |
/** | |
* コンテナーを起動する. | |
* | |
* @param queue | |
* @return | |
* @throws YarnRemoteException | |
*/ | |
protected AMResponse kickContainer(Queue<ContainerLauncher> queue) | |
throws YarnRemoteException { | |
int askCount = queue.size(); | |
List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>(); | |
if (askCount > 0) { | |
ResourceRequest containerAsk = setupContainerAskForRM(askCount); | |
resourceReq.add(containerAsk); | |
} | |
AMResponse amResp = sendContainerAskToRM(resourceReq); | |
List<Container> allocatedContainers = amResp.getAllocatedContainers(); | |
LOG.info("Got response from RM for container ask, allocatedCnt=" | |
+ allocatedContainers.size()); | |
numAllocatedContainers.addAndGet(allocatedContainers.size()); | |
for (Container allocatedContainer : allocatedContainers) { | |
ContainerLauncher launcher = queue.poll(); | |
if (launcher != null) { | |
LaunchThread thread = launcher.create(); | |
thread.init(allocatedContainer); | |
launcherMap.put(allocatedContainer.getId(), launcher); | |
launchThreads.add(thread); | |
thread.start(); | |
} | |
} | |
return amResp; | |
} | |
/** | |
* コンテナーの完了状態をチェックする. | |
* | |
* @param amResp | |
* @param queue | |
*/ | |
protected void checkComplete(AMResponse amResp, | |
Queue<ContainerLauncher> queue) { | |
List<ContainerStatus> completedContainers = amResp | |
.getCompletedContainersStatuses(); | |
LOG.info("Got response from RM for container ask, completedCnt=" | |
+ completedContainers.size()); | |
for (ContainerStatus containerStatus : completedContainers) { | |
LOG.info("Got container status for containerID= " | |
+ containerStatus.getContainerId() + ", state=" | |
+ containerStatus.getState() + ", exitStatus=" | |
+ containerStatus.getExitStatus() + ", diagnostics=" | |
+ containerStatus.getDiagnostics()); | |
assert (containerStatus.getState() == ContainerState.COMPLETE); | |
// コンテナーの完了/失敗に応じてカウンターを増減させる | |
ExitStatusAction exitStatus = checkExitStatus(containerStatus); | |
switch (exitStatus) { | |
case SUCCEEDED: | |
numCompletedContainers.incrementAndGet(); | |
LOG.info("Container completed successfully." + ", containerId=" | |
+ containerStatus.getContainerId()); | |
break; | |
case FAILED: | |
numCompletedContainers.incrementAndGet(); | |
numFailedContainers.incrementAndGet(); | |
break; | |
case RETRY: | |
numAllocatedContainers.decrementAndGet(); | |
ContainerLauncher launcher = launcherMap.get(containerStatus | |
.getContainerId()); | |
if (launcher != null) { | |
queue.add(launcher); | |
} | |
break; | |
default: | |
throw new Error("exitStatus=" + exitStatus); | |
} | |
} | |
} | |
/** | |
* {@link LaunchThread}を生成する. | |
*/ | |
public static interface ContainerLauncher { | |
public LaunchThread create(); | |
} | |
/** | |
* コンテナーを起動するスレッド. | |
*/ | |
public abstract class LaunchThread extends Thread { | |
/** アロケートされたコンテナー */ | |
private Container container; | |
/** ContainerManager */ | |
private ContainerManager cm; | |
/** | |
* 初期化. | |
* | |
* @param container | |
*/ | |
public void init(Container container) { | |
this.container = container; | |
} | |
@Override | |
public void run() { | |
try { | |
LOG.info("Connecting to container manager for containerid=" | |
+ container.getId()); | |
this.cm = connectToCM(); | |
ContainerLaunchContext ctx = getContainerLaunchContext(); | |
startContainer(ctx); | |
} catch (Exception e) { | |
numCompletedContainers.incrementAndGet(); | |
numFailedContainers.incrementAndGet(); | |
e.printStackTrace(); | |
} | |
} | |
/** | |
* コンテナーマネージャーへ接続する. | |
* | |
* @return | |
*/ | |
protected ContainerManager connectToCM() { | |
String cmIpPortStr = container.getNodeId().getHost() + ":" | |
+ container.getNodeId().getPort(); | |
InetSocketAddress cmAddress = NetUtils | |
.createSocketAddr(cmIpPortStr); | |
LOG.info("Connecting to ResourceManager at " + cmIpPortStr); | |
return (ContainerManager) rpc.getProxy(ContainerManager.class, | |
cmAddress, conf); | |
} | |
/** | |
* コンテナーの実行情報を生成する. | |
* | |
* @return | |
* @throws IOException | |
*/ | |
protected ContainerLaunchContext getContainerLaunchContext() | |
throws IOException { | |
ContainerLaunchContext ctx = Records | |
.newRecord(ContainerLaunchContext.class); | |
ctx.setContainerId(container.getId()); | |
ctx.setResource(container.getResource()); | |
try { | |
ctx.setUser(UserGroupInformation.getCurrentUser() | |
.getShortUserName()); | |
} catch (IOException e) { | |
LOG.info("Getting current user info failed when trying to launch the container" | |
+ e.getMessage()); | |
} | |
Map<String, LocalResource> localResources = getLocalResources(); | |
ctx.setLocalResources(localResources); | |
Map<String, String> env = getEnvironment(); | |
ctx.setEnvironment(env); | |
String command = getCommand(); | |
List<String> commands = Collections.singletonList(command); | |
ctx.setCommands(commands); | |
return ctx; | |
} | |
/** | |
* コンテナー用のローカルリソースを指定する. | |
* | |
* @return | |
* @throws IOException | |
*/ | |
protected Map<String, LocalResource> getLocalResources() | |
throws IOException { | |
return Collections.emptyMap(); | |
} | |
/** | |
* コンテナー用の環境変数を指定する. | |
* | |
* @return | |
*/ | |
protected Map<String, String> getEnvironment() { | |
return Collections.emptyMap(); | |
} | |
/** | |
* コンテナープログラムを実行するコマンドを指定する. | |
* | |
* @return コマンド | |
*/ | |
protected abstract String getCommand(); | |
/** | |
* コンテナープログラムを実行開始する. | |
* | |
* @param ctx | |
* @throws YarnRemoteException | |
*/ | |
protected void startContainer(ContainerLaunchContext ctx) | |
throws YarnRemoteException { | |
StartContainerRequest startReq = Records | |
.newRecord(StartContainerRequest.class); | |
startReq.setContainerLaunchContext(ctx); | |
cm.startContainer(startReq); | |
} | |
} | |
/** | |
* Javaプログラム用コンテナーを起動させるスレッド. | |
*/ | |
public class JavaLaunchThread extends LaunchThread { | |
private Class<?> containerClass; | |
private File containerJar; | |
public JavaLaunchThread(Class<?> containerClass) { | |
this.containerClass = containerClass; | |
this.containerJar = new File( | |
ClassUtil.findContainingJar(containerClass)); | |
} | |
@Override | |
protected Map<String, LocalResource> getLocalResources() | |
throws IOException { | |
Path jarPath = new Path(containerJar.toURI()); | |
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); | |
FileSystem fs = FileSystem.get(conf); | |
FileStatus destStatus = fs.getFileStatus(jarPath); | |
LocalResource cmJarRsrc = Records.newRecord(LocalResource.class); | |
cmJarRsrc.setType(LocalResourceType.FILE); | |
cmJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); | |
cmJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); | |
cmJarRsrc.setTimestamp(destStatus.getModificationTime()); | |
cmJarRsrc.setSize(destStatus.getLen()); | |
localResources.put("Container.jar", cmJarRsrc); | |
return localResources; | |
} | |
@Override | |
protected Map<String, String> getEnvironment() { | |
Map<String, String> env = new HashMap<String, String>(); | |
String classPathEnv = "${CLASSPATH}" + ":./*" + ":$HADOOP_CONF_DIR" | |
+ ":$HADOOP_COMMON_HOME/share/hadoop/common/*" | |
+ ":$HADOOP_COMMON_HOME/share/hadoop/common/lib/*" | |
+ ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/*" | |
+ ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*" | |
+ ":$YARN_HOME/modules/*" + ":$YARN_HOME/lib/*" | |
+ ":./log4j.properties:"; | |
env.put("CLASSPATH", classPathEnv); | |
return env; | |
} | |
@Override | |
protected String getCommand() { | |
StringBuilder sb = new StringBuilder(128); | |
sb.append("${JAVA_HOME}/bin/java "); | |
sb.append(containerClass.getName()); | |
for (Object arg : getCommandArgs()) { | |
sb.append(" "); | |
sb.append(arg); | |
} | |
sb.append(" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR | |
+ "/container.stdout.log"); | |
sb.append(" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR | |
+ "/container.stderr.log"); | |
return sb.toString(); | |
} | |
/** | |
* コンテナークラスのmain()に渡す引数を返す. | |
* | |
* @return 引数のリスト | |
*/ | |
protected List<Object> getCommandArgs() { | |
return Arrays.<Object> asList(); | |
} | |
} | |
/** | |
* コンテナーについて問い合わせる情報を生成する. | |
* | |
* @param numContainers | |
* コンテナー数 | |
* @return | |
*/ | |
protected ResourceRequest setupContainerAskForRM(int numContainers) { | |
ResourceRequest request = Records.newRecord(ResourceRequest.class); | |
request.setHostName("*"); | |
request.setNumContainers(numContainers); | |
Priority pri = Records.newRecord(Priority.class); | |
pri.setPriority(requestPriority); | |
request.setPriority(pri); | |
Resource capability = Records.newRecord(Resource.class); | |
capability.setMemory(containerMemory); | |
request.setCapability(capability); | |
return request; | |
} | |
/** | |
* コンテナーのアロケートを行う. | |
* | |
* @param requestedContainers | |
* @return | |
* @throws YarnRemoteException | |
*/ | |
protected AMResponse sendContainerAskToRM( | |
List<ResourceRequest> requestedContainers) | |
throws YarnRemoteException { | |
AllocateRequest req = Records.newRecord(AllocateRequest.class); | |
req.setResponseId(rmRequestID.incrementAndGet()); | |
req.setApplicationAttemptId(appAttemptID); | |
req.addAllAsks(requestedContainers); | |
req.addAllReleases(releasedContainers); | |
req.setProgress((float) numCompletedContainers.get() | |
/ numTotalContainers); | |
LOG.info("Sending request to RM for containers" + ", requestedSet=" | |
+ requestedContainers.size() + ", releasedSet=" | |
+ releasedContainers.size() + ", progress=" + req.getProgress()); | |
for (ResourceRequest rsrcReq : requestedContainers) { | |
LOG.info("Requested container ask: " + rsrcReq.toString()); | |
} | |
for (ContainerId id : releasedContainers) { | |
LOG.info("Released container, id=" + id.getId()); | |
} | |
AllocateResponse resp = resourceManager.allocate(req); | |
return resp.getAMResponse(); | |
} | |
/** | |
* コンテナーの終了状態 | |
*/ | |
public static enum ExitStatusAction { | |
SUCCEEDED, FAILED, RETRY | |
} | |
/** | |
* コンテナーの終了状態を判定する. | |
* | |
* @param containerStatus | |
* @return | |
*/ | |
protected ExitStatusAction checkExitStatus(ContainerStatus containerStatus) { | |
int exitStatus = containerStatus.getExitStatus(); | |
switch (exitStatus) { | |
case 0: | |
return ExitStatusAction.SUCCEEDED; | |
case -100: | |
return ExitStatusAction.RETRY; | |
default: | |
return ExitStatusAction.FAILED; | |
} | |
} | |
/** | |
* クライアントに対して完了を通知する. | |
* | |
* @return 終了コード | |
* @throws YarnRemoteException | |
*/ | |
protected int finish() throws YarnRemoteException { | |
LOG.info("Application completed. Signalling finish to RM"); | |
FinishApplicationMasterRequest finishReq = Records | |
.newRecord(FinishApplicationMasterRequest.class); | |
finishReq.setAppAttemptId(appAttemptID); | |
int exitCode = initFinishRequest(finishReq); | |
resourceManager.finishApplicationMaster(finishReq); | |
return exitCode; | |
} | |
/** | |
* 完了通知用の初期化を行う. | |
* | |
* @param finishReq | |
* @return 終了コード | |
*/ | |
protected int initFinishRequest(FinishApplicationMasterRequest finishReq) { | |
if (numFailedContainers.get() == 0) { | |
finishReq | |
.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED); | |
return 0; | |
} else { | |
finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED); | |
String diagnostics = "Diagnostics." + ", total=" | |
+ numTotalContainers + ", completed=" | |
+ numCompletedContainers.get() + ", allocated=" | |
+ numAllocatedContainers.get() + ", failed=" | |
+ numFailedContainers.get(); | |
finishReq.setDiagnostics(diagnostics); | |
return 1; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sample.yarn; | |
import java.io.File; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.util.Collections; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.FileStatus; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.net.NetUtils; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
import org.apache.hadoop.yarn.api.ApplicationConstants; | |
import org.apache.hadoop.yarn.api.ClientRMProtocol; | |
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; | |
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; | |
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; | |
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; | |
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; | |
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; | |
import org.apache.hadoop.yarn.api.records.ApplicationId; | |
import org.apache.hadoop.yarn.api.records.ApplicationReport; | |
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; | |
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; | |
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; | |
import org.apache.hadoop.yarn.api.records.LocalResource; | |
import org.apache.hadoop.yarn.api.records.LocalResourceType; | |
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; | |
import org.apache.hadoop.yarn.api.records.Priority; | |
import org.apache.hadoop.yarn.api.records.Resource; | |
import org.apache.hadoop.yarn.api.records.YarnApplicationState; | |
import org.apache.hadoop.yarn.conf.YarnConfiguration; | |
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; | |
import org.apache.hadoop.yarn.ipc.YarnRPC; | |
import org.apache.hadoop.yarn.util.ConverterUtils; | |
import org.apache.hadoop.yarn.util.Records; | |
/** | |
* Hadoop0.23.0 YARN Client. | |
* <p> | |
* →<a href= | |
* "http://hadoop.apache.org/common/docs/r0.23.0/hadoop-yarn/hadoop-yarn-site/WritingYarnApplications.html" | |
* >参考</a><br> | |
* →<a href= | |
* "http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/0.23/yarn2.html" | |
* >説明</a> | |
* </p> | |
* | |
* @see org.apache.hadoop.yarn.applications.distributedshell.Client | |
* @author hishidama | |
*/ | |
public abstract class YarnClient extends Configured implements Tool { | |
private static final Log LOG = LogFactory.getLog(YarnClient.class); | |
protected static int run(YarnClient client, String[] args) throws Exception { | |
return ToolRunner.run(client, args); | |
} | |
private YarnRPC rpc; | |
private ClientRMProtocol applicationsManager; // Resource Manager | |
/** アプリケーション名 */ | |
protected String appName; | |
/** AppMstrのクラス名 */ | |
protected Class<?> appMasterClass; | |
/** AppMstrが入っているjarファイルのパス */ | |
protected File appMasterJar; | |
/** AppMstrの優先度 */ | |
protected int amPriority = 0; | |
/** AppMstrのキュー */ | |
protected String amQueue = ""; | |
/** AppMstrを実行するユーザー */ | |
protected String amUser = ""; | |
/** AppMstrを実行するのに必要な(要求する)メモリーサイズ */ | |
protected int amMemory = 1 * 1024; | |
/** 当Clientの開始時刻 */ | |
private final long clientStartTime = System.currentTimeMillis(); | |
/** AppMstrのタイムアウト時間(この時間を過ぎるとkillする) */ | |
private long clientTimeout = TimeUnit.MINUTES.toMillis(10); | |
/** | |
* コンストラクター. | |
* | |
* @param appName | |
* | |
* @param appMasterClass | |
* AppMstrのクラス名 | |
*/ | |
protected YarnClient(String appName, Class<?> appMasterClass) { | |
this.appName = appName; | |
this.appMasterClass = appMasterClass; | |
this.appMasterJar = new File( | |
ClassUtil.findContainingJar(appMasterClass)); | |
} | |
@Override | |
public int run(String[] args) throws Exception { | |
init(args); | |
Configuration conf = getConf(); | |
rpc = YarnRPC.create(conf); | |
applicationsManager = connectToASM(); | |
GetNewApplicationResponse newApp = getApplication(); | |
ApplicationSubmissionContext appContext = getAppContext(newApp); | |
submit(appContext); | |
boolean succeeded = monitorApplication(newApp.getApplicationId()); | |
return succeeded ? 0 : 1; | |
} | |
/** | |
* 初期化. | |
* | |
* @param args | |
* main()の引数 | |
*/ | |
protected void init(String[] args) { | |
// 各クラスでオーバーライドする | |
} | |
/** | |
* ApplicationsManager(AsM)へ接続する. | |
* | |
* @return | |
*/ | |
protected ClientRMProtocol connectToASM() { | |
Configuration conf = getConf(); | |
YarnConfiguration yarnConf = new YarnConfiguration(conf); | |
InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( | |
YarnConfiguration.RM_ADDRESS, | |
YarnConfiguration.DEFAULT_RM_ADDRESS)); | |
LOG.info("Connecting to ResourceManager at " + rmAddress); | |
return (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, | |
rmAddress, conf); | |
} | |
/** | |
* ASMから新しいアプリケーションのApplicationIdを取得する. | |
* <p> | |
* responseには、使用できるリソース(使用可能メモリーの最小値・最大値)が返ってくる。 | |
* </p> | |
* | |
* @return | |
* @throws YarnRemoteException | |
*/ | |
protected GetNewApplicationResponse getApplication() | |
throws YarnRemoteException { | |
GetNewApplicationRequest request = Records | |
.newRecord(GetNewApplicationRequest.class); | |
GetNewApplicationResponse response = applicationsManager | |
.getNewApplication(request); | |
LOG.info("Got new application id=" + response.getApplicationId()); | |
return response; | |
} | |
/** | |
* AppMstrの実行情報を生成する. | |
* | |
* @param newApp | |
* @return | |
* @throws IOException | |
*/ | |
protected ApplicationSubmissionContext getAppContext( | |
GetNewApplicationResponse newApp) throws IOException { | |
LOG.info("Setting up application submission context for ASM"); | |
ApplicationSubmissionContext appContext = Records | |
.newRecord(ApplicationSubmissionContext.class); | |
ApplicationId appId = newApp.getApplicationId(); | |
appContext.setApplicationId(appId); | |
appContext.setApplicationName(appName); | |
ContainerLaunchContext amContainer = getContainerLaunchContext(newApp, | |
appId); | |
appContext.setAMContainerSpec(amContainer); | |
Priority pri = Records.newRecord(Priority.class); | |
pri.setPriority(amPriority); | |
appContext.setPriority(pri); | |
appContext.setQueue(amQueue); | |
appContext.setUser(amUser); | |
return appContext; | |
} | |
/** | |
* AppMstrの実行情報を生成する. | |
* | |
* @param newApp | |
* @param appId | |
* @return | |
* @throws IOException | |
*/ | |
protected ContainerLaunchContext getContainerLaunchContext( | |
GetNewApplicationResponse newApp, ApplicationId appId) | |
throws IOException { | |
ContainerLaunchContext amContainer = Records | |
.newRecord(ContainerLaunchContext.class); | |
Map<String, LocalResource> localResources = getLocalResources(); | |
amContainer.setLocalResources(localResources); | |
Map<String, String> env = getEnvironment(); | |
amContainer.setEnvironment(env); | |
String command = getCommand(); | |
List<String> commands = Collections.singletonList(command); | |
amContainer.setCommands(commands); | |
Resource capability = getCapability(newApp); | |
amContainer.setResource(capability); | |
return amContainer; | |
} | |
/** | |
* AppMstr用のローカルリソースを指定する. | |
* | |
* @return | |
* @throws IOException | |
*/ | |
protected Map<String, LocalResource> getLocalResources() throws IOException { | |
Path jarPath = new Path(appMasterJar.toURI()); | |
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); | |
LOG.info("Copy App Master jar from local filesystem and add to local environment"); | |
Configuration conf = getConf(); | |
FileSystem fs = FileSystem.get(conf); | |
FileStatus destStatus = fs.getFileStatus(jarPath); | |
LocalResource amJarRsrc = Records.newRecord(LocalResource.class); | |
amJarRsrc.setType(LocalResourceType.FILE); | |
amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); | |
amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); | |
amJarRsrc.setTimestamp(destStatus.getModificationTime()); | |
amJarRsrc.setSize(destStatus.getLen()); | |
localResources.put("AppMaster.jar", amJarRsrc); | |
return localResources; | |
} | |
/** | |
* AppMstr用の環境変数を指定する. | |
* | |
* @return | |
*/ | |
protected Map<String, String> getEnvironment() { | |
Map<String, String> env = new HashMap<String, String>(); | |
String classPathEnv = "${CLASSPATH}" + ":./*" + ":$HADOOP_CONF_DIR" | |
+ ":$HADOOP_COMMON_HOME/share/hadoop/common/*" | |
+ ":$HADOOP_COMMON_HOME/share/hadoop/common/lib/*" | |
+ ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/*" | |
+ ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*" | |
+ ":$YARN_HOME/modules/*" + ":$YARN_HOME/lib/*" | |
+ ":./log4j.properties:"; | |
env.put("CLASSPATH", classPathEnv); | |
return env; | |
} | |
/** | |
* AppMstrを実行するコマンドを指定する. | |
* | |
* @return | |
*/ | |
protected String getCommand() { | |
StringBuilder sb = new StringBuilder(128); | |
sb.append("${JAVA_HOME}/bin/java "); | |
sb.append(appMasterClass.getName()); | |
for (Object arg : getCommandArgs()) { | |
sb.append(" "); | |
sb.append(arg); | |
} | |
sb.append(" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR | |
+ "/stdout.log"); | |
sb.append(" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR | |
+ "/stderr.log"); | |
return sb.toString(); | |
} | |
/** | |
* AppMstrクラスのmain()に渡す引数を返す. | |
* | |
* @return 引数のリスト | |
*/ | |
protected List<Object> getCommandArgs() { | |
return Collections.emptyList(); | |
} | |
/** | |
* AppMstrに必要なリソースを指定する. | |
* | |
* @param newApp | |
* @return | |
*/ | |
protected Resource getCapability(GetNewApplicationResponse newApp) { | |
{ | |
int minMem = newApp.getMinimumResourceCapability().getMemory(); | |
int maxMem = newApp.getMaximumResourceCapability().getMemory(); | |
LOG.info("Min mem capabililty of resources in this cluster " | |
+ minMem); | |
LOG.info("Max mem capabililty of resources in this cluster " | |
+ maxMem); | |
if (amMemory < minMem) { | |
LOG.info("AM memory specified below min threshold of cluster. Using min value." | |
+ ", specified=" + amMemory + ", min=" + minMem); | |
amMemory = minMem; | |
} else if (amMemory > maxMem) { | |
LOG.info("AM memory specified above max threshold of cluster. Using max value." | |
+ ", specified=" + amMemory + ", max=" + maxMem); | |
amMemory = maxMem; | |
} | |
} | |
// Hadoop0.23.0では、指定できるリソースはメモリーサイズのみ | |
Resource capability = Records.newRecord(Resource.class); | |
capability.setMemory(amMemory); | |
return capability; | |
} | |
/** | |
* AppMstrを実行開始する. | |
* <p> | |
* Resourece Managerに実行を依頼する。 | |
* </p> | |
* | |
* @param appContext | |
* @throws YarnRemoteException | |
*/ | |
protected void submit(ApplicationSubmissionContext appContext) | |
throws YarnRemoteException { | |
SubmitApplicationRequest appRequest = Records | |
.newRecord(SubmitApplicationRequest.class); | |
appRequest.setApplicationSubmissionContext(appContext); | |
LOG.info("Submitting application to ASM: " + appContext); | |
applicationsManager.submitApplication(appRequest); | |
} | |
/** | |
* AppMstrの実行状況を監視する. | |
* | |
* @param appId | |
* @return 正常に終了した場合、true | |
* @throws YarnRemoteException | |
*/ | |
protected boolean monitorApplication(ApplicationId appId) | |
throws YarnRemoteException { | |
while (true) { | |
// 1秒間隔でチェック | |
try { | |
Thread.sleep(1000); | |
} catch (InterruptedException e) { | |
LOG.debug("Thread sleep in monitoring loop interrupted"); | |
} | |
// AppMstrの実行状況(レポート)を取得する | |
GetApplicationReportRequest reportRequest = Records | |
.newRecord(GetApplicationReportRequest.class); | |
reportRequest.setApplicationId(appId); | |
GetApplicationReportResponse reportResponse = applicationsManager | |
.getApplicationReport(reportRequest); | |
ApplicationReport report = reportResponse.getApplicationReport(); | |
YarnApplicationState state = report.getYarnApplicationState(); | |
FinalApplicationStatus dsStatus = report | |
.getFinalApplicationStatus(); | |
LOG.info("appId:" + report.getApplicationId() + " state=" + state | |
+ ", dsStatus=" + dsStatus); | |
// System.out.println("report+++" + report); | |
switch (state) { | |
case FINISHED: | |
if (dsStatus == FinalApplicationStatus.SUCCEEDED) { | |
LOG.info("Application has completed successfully. Breaking monitoring loop"); | |
return true; | |
} else { | |
LOG.info("Application did finished unsuccessfully." | |
+ " YarnState=" + state.toString() | |
+ ", DSFinalStatus=" + dsStatus.toString() | |
+ ". Breaking monitoring loop"); | |
return false; | |
} | |
case KILLED: | |
case FAILED: | |
LOG.info("Application did not finish." + " YarnState=" | |
+ state.toString() + ", DSFinalStatus=" | |
+ dsStatus.toString() + ". Breaking monitoring loop"); | |
return false; | |
} | |
if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { | |
LOG.info("Reached client specified timeout for application. Killing application"); | |
killApplication(appId); | |
return false; | |
} | |
} | |
} | |
protected void killApplication(ApplicationId appId) | |
throws YarnRemoteException { | |
KillApplicationRequest request = Records | |
.newRecord(KillApplicationRequest.class); | |
request.setApplicationId(appId); | |
applicationsManager.forceKillApplication(request); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment