Skip to content

Instantly share code, notes, and snippets.

@pdtran3k6
Created November 17, 2018 02:27
Show Gist options
  • Save pdtran3k6/1d68c7cab2e294b7c873ce0ef940a315 to your computer and use it in GitHub Desktop.
Save pdtran3k6/1d68c7cab2e294b7c873ce0ef940a315 to your computer and use it in GitHub Desktop.
Refactor suggestion for testability and readability
...
public static void main(String[] args) {
long started = System.currentTimeMillis();
boolean succeeded = runTonyAM(args);
long completed = System.currentTimeMillis();
// By this time jobDir should have been set
HistoryFileUtils.createHistoryFile(fs,
TonyJobMetadata.newInstance(yarnConf, appIdString, started, completed, succeeded), jobDir);
if (succeeded) {
LOG.info("Application Master completed successfully. exiting");
System.exit(0);
} else {
LOG.info("Application Master failed. exiting");
System.exit(-1);
}
}
public static boolean runTonyAM(String[] args) {
TonyApplicationMaster am = new TonyApplicationMaster();
if (!am.init(args)) {
return false;
}
if (!am.prepare()) {
return false;
}
am.mainThread = Thread.currentThread();
boolean succeeded;
do {
// Crash AM on purpose during AM crash tests.
String shouldCrash = System.getenv(Constants.TEST_AM_CRASH);
if (shouldCrash != null && shouldCrash.equals("true")) {
LOG.fatal("Error running TonyApplicationMaster !!");
return false;
}
try {
am.start();
} catch (Exception e) {
LOG.error("Exception when we're starting TonyAM", e);
return false;
}
succeeded = am.monitor();
if (succeeded || am.amRetryCount == 0) {
LOG.info("Result: " + succeeded + ", retry count: " + am.amRetryCount);
break;
}
// Prepare for retryCount.
am.reset();
LOG.info("Retrying, remaining retry count" + am.amRetryCount);
am.amRetryCount -= 1;
} while (!am.singleNode); // We don't retry on single node training.
// Wait for the worker nodes to finish (The interval between registering the exit code to final exit)
am.stop();
am.printTaskUrls();
return succeeded;
}
....
/**
* Monitor the TensorFlow training job.
* @return if the tensorflow job finishes successfully.
*/
private boolean monitor() {
...
while (true) {
/*
All of these will be checked in session.updateSessionStatus(),
so we can just break the loop without having
to set FinalStatus or return right away.
*/
if (System.currentTimeMillis() > expireTime) {
LOG.error("Application times out.");
break;
}
// Check if client signals we should exit.
if (clientSignalToStop) {
LOG.info("Client signals AM to exit.");
break;
}
if (session.isTrainingFinished()) {
LOG.info("Training has finished.");
break;
}
if (preprocessExitCode != 0) {
LOG.info("Preprocess failed with exit code: " + preprocessExitCode);
break;
}
if (singleNode && preprocessFinished) {
LOG.info("Single node training finished with exit code: " + preprocessExitCode);
break;
}
/*
This has to be set in onTaskDeemedDead(TonyTask task) since
it won't be checked by session.updateSessionStatus()
*/
if (this.taskHasMissesHB) {
LOG.info("Application failed due to missed heartbeats");
break;
}
if (numCompletedWorkerTasks.get() == numTotalWorkerTasks) {
// minor refactoring of logging num tasks finished.
Utils.printWorkerTasksCompleted(numCompletedWorkerTasks, numTotalWorkerTasks);
break;
}
Utils.printWorkerTasksCompleted(numCompletedWorkerTasks, numTotalWorkerTasks);
// Pause before refresh job status
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
LOG.error("Thread interrupted", e);
}
}
session.updateSessionStatus(); // this will set FinalStatus accordingly
Utils.printWorkerTasksCompleted(numCompletedWorkerTasks, numTotalWorkerTasks);
FinalApplicationStatus status = session.getFinalStatus();
String appMessage = session.getFinalMessage();
if (status != FinalApplicationStatus.SUCCEEDED) {
LOG.info("Tony session failed: " + appMessage);
}
return status == FinalApplicationStatus.SUCCEEDED;
}
....
private void onTaskDeemedDead(TonyTask task) {
LOG.info("Task with id [" + task.getId() + "] has missed"
+ " [" + maxConsecutiveHBMiss + "] heartbeats.. Ending application !!");
String msg = "Task with id [" + task.getId() + "] deemed dead!!";
LOG.error(msg);
taskHasMissesHB = true;
session.setFinalStatus(FinalApplicationStatus.FAILED, msg); // set FinalStatus here as well
mainThread.interrupt();
}
...
public void updateSessionStatus() {
int failureCount = 0;
if (getFinalStatus() == FinalApplicationStatus.FAILED) return; // short cirtcuit in case FinalStatus was already set
for (Map.Entry<String, TonyTask[]> entry : jobTasks.entrySet()) {
String jobName = entry.getKey();
TonyTask[] tasks = entry.getValue();
if (jobName.equals(PS_JOB_NAME)) {
// ignore PS job
continue;
}
...
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment