Skip to content

Instantly share code, notes, and snippets.

@pdtran3k6
Created November 18, 2018 08:43
Show Gist options
  • Save pdtran3k6/0cd3eef0e49958dac1fa778f41b22138 to your computer and use it in GitHub Desktop.
Save pdtran3k6/0cd3eef0e49958dac1fa778f41b22138 to your computer and use it in GitHub Desktop.
diff --git a/tony-core/src/main/java/com/linkedin/tony/TonyApplicationMaster.java b/tony-core/src/main/java/com/linkedin/tony/TonyApplicationMaster.java
index 8e35fda..687f567 100644
--- a/tony-core/src/main/java/com/linkedin/tony/TonyApplicationMaster.java
+++ b/tony-core/src/main/java/com/linkedin/tony/TonyApplicationMaster.java
@@ -313,39 +313,57 @@ public class TonyApplicationMaster {
* @param args the args from user inputs
*/
public static void main(String[] args) {
- boolean succeeded = false;
- long started;
- long completed;
- TonyApplicationMaster am = new TonyApplicationMaster();
- boolean sanityCheck = am.init(args);
- if (!sanityCheck) {
+ long started = System.currentTimeMillis();
+ boolean succeeded = runTonyAM(args);
+ long completed = System.currentTimeMillis();
+
+ // By this time jobDir should have been set
+ HistoryFileUtils.createHistoryFile(fs,
+ TonyJobMetadata.newInstance(yarnConf, appIdString, started, completed, succeeded), jobDir);
+
+ if (succeeded) {
+ LOG.info("Application Master completed successfully. exiting");
+ System.exit(0);
+ } else {
+ LOG.info("Application Master failed. exiting");
System.exit(-1);
}
+ }
+
+ @VisibleForTesting
+ static boolean runTonyAM(String[] args) {
+ TonyApplicationMaster am = new TonyApplicationMaster();
+ if (!am.init(args)) {
+ return false;
+ }
+
if (!am.prepare()) {
- System.exit(-1);
+ return false;
}
+
am.mainThread = Thread.currentThread();
- boolean exitOnError = false;
- started = System.currentTimeMillis();
+ boolean succeeded;
do {
// Crash AM on purpose during AM crash tests.
String shouldCrash = System.getenv(Constants.TEST_AM_CRASH);
if (shouldCrash != null && shouldCrash.equals("true")) {
- exitOnError = true;
- break;
+ LOG.fatal("Error running TonyApplicationMaster !!");
+ return false;
}
try {
am.start();
} catch (Exception e) {
LOG.error("Exception when we're starting TonyAM", e);
- System.exit(-1);
+ return false;
}
+
succeeded = am.monitor();
if (succeeded || am.amRetryCount == 0) {
LOG.info("Result: " + succeeded + ", retry count: " + am.amRetryCount);
break;
}
+
// Prepare for retryCount.
am.reset();
LOG.info("Retrying, remaining retry count" + am.amRetryCount);
@@ -353,24 +371,8 @@ public class TonyApplicationMaster {
} while (!am.singleNode); // We don't retry on single node training.
// Wait for the worker nodes to finish (The interval between registering the exit code to final exit)
am.stop();
- completed = System.currentTimeMillis();
am.printTaskUrls();
- if (exitOnError) {
- LOG.fatal("Error running TonyApplicationMaster !!");
- System.exit(-1);
- }
-
- // By this time jobDir should have been set
- HistoryFileUtils.createHistoryFile(fs,
- TonyJobMetadata.newInstance(yarnConf, appIdString, started, completed, succeeded), jobDir);
-
- if (succeeded) {
- LOG.info("Application Master completed successfully. exiting");
- System.exit(0);
- } else {
- LOG.info("Application Master failed. exiting");
- System.exit(-1);
- }
+ return succeeded;
}
/**
@@ -546,63 +548,60 @@ public class TonyApplicationMaster {
// Checking timeout
if (System.currentTimeMillis() > expireTime) {
LOG.error("Application times out.");
- return false;
+ break;
}
// Check if client signals we should exit.
if (clientSignalToStop) {
LOG.info("Client signals AM to exit.");
- return true;
+ break;
}
if (session.isTrainingFinished()) {
LOG.info("Training has finished.");
- return session.getFinalStatus() == FinalApplicationStatus.SUCCEEDED;
+ break;
}
if (preprocessExitCode != 0) {
LOG.info("Preprocess failed with exit code: " + preprocessExitCode);
- return false;
+ break;
}
if (singleNode && preprocessFinished) {
LOG.info("Single node training finished with exit code: " + preprocessExitCode);
- return preprocessExitCode == 0;
+ break;
}
- if (numCompletedWorkerTasks.get() == numTotalWorkerTasks) {
- LOG.info("Completed jobs: " + numCompletedWorkerTasks.get() + " total jobs: " + numTotalWorkerTasks);
+ if (this.taskHasMissesHB) {
+ LOG.info("Application failed due to missed heartbeats");
break;
}
- LOG.info("Completed worker tasks: " + numCompletedWorkerTasks.get()
- + ", total worker tasks: " + numTotalWorkerTasks);
- if (this.taskHasMissesHB) {
+ if (numCompletedWorkerTasks.get() == numTotalWorkerTasks) {
+ Utils.printWorkerTasksCompleted(numCompletedWorkerTasks, numTotalWorkerTasks);
break;
}
+
+ Utils.printWorkerTasksCompleted(numCompletedWorkerTasks, numTotalWorkerTasks);
+
+ // Pause before refresh job status
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
LOG.error("Thread interrupted", e);
}
}
- if (this.taskHasMissesHB) {
- session.setFinalStatus(FinalApplicationStatus.FAILED,
- "Application failed due to missed heartbeats");
- } else {
- session.updateSessionStatus();
- }
- LOG.info("Total completed worker tasks: " + numCompletedWorkerTasks.get()
- + ", total worker tasks: " + numTotalWorkerTasks);
- boolean success = true;
+ session.updateSessionStatus();
+
+ Utils.printWorkerTasksCompleted(numCompletedWorkerTasks, numTotalWorkerTasks);
+
FinalApplicationStatus status = session.getFinalStatus();
String appMessage = session.getFinalMessage();
if (status != FinalApplicationStatus.SUCCEEDED) {
LOG.info("Tony session failed: " + appMessage);
- success = false;
}
- return success;
+ return status == FinalApplicationStatus.SUCCEEDED;
}
/**
@@ -1105,8 +1104,10 @@ public class TonyApplicationMaster {
+ " [" + maxConsecutiveHBMiss + "] heartbeats.. Ending application !!");
// TODO: figure out what is the right thing to do here..
// TODO: For the time being, we just kill the job..
- LOG.error("Task with id [" + task.getId() + "] deemed dead!!");
+ String msg = "Task with id [" + task.getId() + "] deemed dead!!";
+ LOG.error(msg);
taskHasMissesHB = true;
+ session.setFinalStatus(FinalApplicationStatus.FAILED, msg);
mainThread.interrupt();
}
diff --git a/tony-core/src/main/java/com/linkedin/tony/TonyClient.java b/tony-core/src/main/java/com/linkedin/tony/TonyClient.java
index 3146a08..21968cd 100644
--- a/tony-core/src/main/java/com/linkedin/tony/TonyClient.java
+++ b/tony-core/src/main/java/com/linkedin/tony/TonyClient.java
@@ -681,11 +681,12 @@ public class TonyClient implements AutoCloseable {
@VisibleForTesting
public int start() {
- boolean result = true;
+ boolean result;
try {
result = run();
} catch (IOException | InterruptedException | URISyntaxException | YarnException e) {
LOG.fatal("Failed to run TonyClient", e);
+ result = false;
}
if (result) {
LOG.info("Application completed successfully");
diff --git a/tony-core/src/main/java/com/linkedin/tony/Utils.java b/tony-core/src/main/java/com/linkedin/tony/Utils.java
index 6b7d552..e09d9dd 100644
--- a/tony-core/src/main/java/com/linkedin/tony/Utils.java
+++ b/tony-core/src/main/java/com/linkedin/tony/Utils.java
@@ -19,6 +19,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -407,5 +408,13 @@ public class Utils {
return "http://" + yarnConf.get(YarnConfiguration.RM_WEBAPP_ADDRESS) + "/cluster/app/" + appId;
}
+ public static void printWorkerTasksCompleted(AtomicInteger completedWTasks, long totalWTasks) {
+ if (completedWTasks.get() == totalWTasks) {
+ LOG.info("Completed all " + totalWTasks + " worker tasks.");
+ return;
+ }
+ LOG.info("Completed worker tasks: " + completedWTasks.get() + " out of " + totalWTasks + " worker tasks." );
+ }
+
private Utils() { }
}
diff --git a/tony-core/src/main/java/com/linkedin/tony/tensorflow/TonySession.java b/tony-core/src/main/java/com/linkedin/tony/tensorflow/TonySession.java
index 69bf5cc..24ce41f 100644
--- a/tony-core/src/main/java/com/linkedin/tony/tensorflow/TonySession.java
+++ b/tony-core/src/main/java/com/linkedin/tony/tensorflow/TonySession.java
@@ -297,6 +297,7 @@ public class TonySession {
*/
public void updateSessionStatus() {
int failureCount = 0;
+ if (getFinalStatus() == FinalApplicationStatus.FAILED) return;
for (Map.Entry<String, TonyTask[]> entry : jobTasks.entrySet()) {
String jobName = entry.getKey();
TonyTask[] tasks = entry.getValue();
@pdtran3k6
Copy link
Author

  • I think HistoryFileUtils.createHistoryFile should be part of runTonyAM() since creating the history files is part of the AM lifecycle.
  • Nice refactoring of monitor(). Definitely looks cleaner now.

if (getFinalStatus() == FinalApplicationStatus.FAILED) return;

Please use braces even for one-line if statements.

Ok great! I've updated accordingly with additional minor changes: tony-framework/TonY@c2fe468.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment