Skip to content

Instantly share code, notes, and snippets.

@pdtran3k6
Created November 18, 2018 08:43
Show Gist options
  • Save pdtran3k6/0cd3eef0e49958dac1fa778f41b22138 to your computer and use it in GitHub Desktop.
Save pdtran3k6/0cd3eef0e49958dac1fa778f41b22138 to your computer and use it in GitHub Desktop.
diff --git a/tony-core/src/main/java/com/linkedin/tony/TonyApplicationMaster.java b/tony-core/src/main/java/com/linkedin/tony/TonyApplicationMaster.java
index 8e35fda..687f567 100644
--- a/tony-core/src/main/java/com/linkedin/tony/TonyApplicationMaster.java
+++ b/tony-core/src/main/java/com/linkedin/tony/TonyApplicationMaster.java
@@ -313,39 +313,57 @@ public class TonyApplicationMaster {
* @param args the args from user inputs
*/
public static void main(String[] args) {
- boolean succeeded = false;
- long started;
- long completed;
- TonyApplicationMaster am = new TonyApplicationMaster();
- boolean sanityCheck = am.init(args);
- if (!sanityCheck) {
+ long started = System.currentTimeMillis();
+ boolean succeeded = runTonyAM(args);
+ long completed = System.currentTimeMillis();
+
+ // By this time jobDir should have been set
+ HistoryFileUtils.createHistoryFile(fs,
+ TonyJobMetadata.newInstance(yarnConf, appIdString, started, completed, succeeded), jobDir);
+
+ if (succeeded) {
+ LOG.info("Application Master completed successfully. exiting");
+ System.exit(0);
+ } else {
+ LOG.info("Application Master failed. exiting");
System.exit(-1);
}
+ }
+
+ @VisibleForTesting
+ static boolean runTonyAM(String[] args) {
+ TonyApplicationMaster am = new TonyApplicationMaster();
+ if (!am.init(args)) {
+ return false;
+ }
+
if (!am.prepare()) {
- System.exit(-1);
+ return false;
}
+
am.mainThread = Thread.currentThread();
- boolean exitOnError = false;
- started = System.currentTimeMillis();
+ boolean succeeded;
do {
// Crash AM on purpose during AM crash tests.
String shouldCrash = System.getenv(Constants.TEST_AM_CRASH);
if (shouldCrash != null && shouldCrash.equals("true")) {
- exitOnError = true;
- break;
+ LOG.fatal("Error running TonyApplicationMaster !!");
+ return false;
}
try {
am.start();
} catch (Exception e) {
LOG.error("Exception when we're starting TonyAM", e);
- System.exit(-1);
+ return false;
}
+
succeeded = am.monitor();
if (succeeded || am.amRetryCount == 0) {
LOG.info("Result: " + succeeded + ", retry count: " + am.amRetryCount);
break;
}
+
// Prepare for retryCount.
am.reset();
LOG.info("Retrying, remaining retry count" + am.amRetryCount);
@@ -353,24 +371,8 @@ public class TonyApplicationMaster {
} while (!am.singleNode); // We don't retry on single node training.
// Wait for the worker nodes to finish (The interval between registering the exit code to final exit)
am.stop();
- completed = System.currentTimeMillis();
am.printTaskUrls();
- if (exitOnError) {
- LOG.fatal("Error running TonyApplicationMaster !!");
- System.exit(-1);
- }
-
- // By this time jobDir should have been set
- HistoryFileUtils.createHistoryFile(fs,
- TonyJobMetadata.newInstance(yarnConf, appIdString, started, completed, succeeded), jobDir);
-
- if (succeeded) {
- LOG.info("Application Master completed successfully. exiting");
- System.exit(0);
- } else {
- LOG.info("Application Master failed. exiting");
- System.exit(-1);
- }
+ return succeeded;
}
/**
@@ -546,63 +548,60 @@ public class TonyApplicationMaster {
// Checking timeout
if (System.currentTimeMillis() > expireTime) {
LOG.error("Application times out.");
- return false;
+ break;
}
// Check if client signals we should exit.
if (clientSignalToStop) {
LOG.info("Client signals AM to exit.");
- return true;
+ break;
}
if (session.isTrainingFinished()) {
LOG.info("Training has finished.");
- return session.getFinalStatus() == FinalApplicationStatus.SUCCEEDED;
+ break;
}
if (preprocessExitCode != 0) {
LOG.info("Preprocess failed with exit code: " + preprocessExitCode);
- return false;
+ break;
}
if (singleNode && preprocessFinished) {
LOG.info("Single node training finished with exit code: " + preprocessExitCode);
- return preprocessExitCode == 0;
+ break;
}
- if (numCompletedWorkerTasks.get() == numTotalWorkerTasks) {
- LOG.info("Completed jobs: " + numCompletedWorkerTasks.get() + " total jobs: " + numTotalWorkerTasks);
+ if (this.taskHasMissesHB) {
+ LOG.info("Application failed due to missed heartbeats");
break;
}
- LOG.info("Completed worker tasks: " + numCompletedWorkerTasks.get()
- + ", total worker tasks: " + numTotalWorkerTasks);
- if (this.taskHasMissesHB) {
+ if (numCompletedWorkerTasks.get() == numTotalWorkerTasks) {
+ Utils.printWorkerTasksCompleted(numCompletedWorkerTasks, numTotalWorkerTasks);
break;
}
+
+ Utils.printWorkerTasksCompleted(numCompletedWorkerTasks, numTotalWorkerTasks);
+
+ // Pause before refresh job status
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
LOG.error("Thread interrupted", e);
}
}
- if (this.taskHasMissesHB) {
- session.setFinalStatus(FinalApplicationStatus.FAILED,
- "Application failed due to missed heartbeats");
- } else {
- session.updateSessionStatus();
- }
- LOG.info("Total completed worker tasks: " + numCompletedWorkerTasks.get()
- + ", total worker tasks: " + numTotalWorkerTasks);
- boolean success = true;
+ session.updateSessionStatus();
+
+ Utils.printWorkerTasksCompleted(numCompletedWorkerTasks, numTotalWorkerTasks);
+
FinalApplicationStatus status = session.getFinalStatus();
String appMessage = session.getFinalMessage();
if (status != FinalApplicationStatus.SUCCEEDED) {
LOG.info("Tony session failed: " + appMessage);
- success = false;
}
- return success;
+ return status == FinalApplicationStatus.SUCCEEDED;
}
/**
@@ -1105,8 +1104,10 @@ public class TonyApplicationMaster {
+ " [" + maxConsecutiveHBMiss + "] heartbeats.. Ending application !!");
// TODO: figure out what is the right thing to do here..
// TODO: For the time being, we just kill the job..
- LOG.error("Task with id [" + task.getId() + "] deemed dead!!");
+ String msg = "Task with id [" + task.getId() + "] deemed dead!!";
+ LOG.error(msg);
taskHasMissesHB = true;
+ session.setFinalStatus(FinalApplicationStatus.FAILED, msg);
mainThread.interrupt();
}
diff --git a/tony-core/src/main/java/com/linkedin/tony/TonyClient.java b/tony-core/src/main/java/com/linkedin/tony/TonyClient.java
index 3146a08..21968cd 100644
--- a/tony-core/src/main/java/com/linkedin/tony/TonyClient.java
+++ b/tony-core/src/main/java/com/linkedin/tony/TonyClient.java
@@ -681,11 +681,12 @@ public class TonyClient implements AutoCloseable {
@VisibleForTesting
public int start() {
- boolean result = true;
+ boolean result;
try {
result = run();
} catch (IOException | InterruptedException | URISyntaxException | YarnException e) {
LOG.fatal("Failed to run TonyClient", e);
+ result = false;
}
if (result) {
LOG.info("Application completed successfully");
diff --git a/tony-core/src/main/java/com/linkedin/tony/Utils.java b/tony-core/src/main/java/com/linkedin/tony/Utils.java
index 6b7d552..e09d9dd 100644
--- a/tony-core/src/main/java/com/linkedin/tony/Utils.java
+++ b/tony-core/src/main/java/com/linkedin/tony/Utils.java
@@ -19,6 +19,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -407,5 +408,13 @@ public class Utils {
return "http://" + yarnConf.get(YarnConfiguration.RM_WEBAPP_ADDRESS) + "/cluster/app/" + appId;
}
+ public static void printWorkerTasksCompleted(AtomicInteger completedWTasks, long totalWTasks) {
+ if (completedWTasks.get() == totalWTasks) {
+ LOG.info("Completed all " + totalWTasks + " worker tasks.");
+ return;
+ }
+ LOG.info("Completed worker tasks: " + completedWTasks.get() + " out of " + totalWTasks + " worker tasks." );
+ }
+
private Utils() { }
}
diff --git a/tony-core/src/main/java/com/linkedin/tony/tensorflow/TonySession.java b/tony-core/src/main/java/com/linkedin/tony/tensorflow/TonySession.java
index 69bf5cc..24ce41f 100644
--- a/tony-core/src/main/java/com/linkedin/tony/tensorflow/TonySession.java
+++ b/tony-core/src/main/java/com/linkedin/tony/tensorflow/TonySession.java
@@ -297,6 +297,7 @@ public class TonySession {
*/
public void updateSessionStatus() {
int failureCount = 0;
+ if (getFinalStatus() == FinalApplicationStatus.FAILED) return;
for (Map.Entry<String, TonyTask[]> entry : jobTasks.entrySet()) {
String jobName = entry.getKey();
TonyTask[] tasks = entry.getValue();
@oliverhu
Copy link

Can we make runTony non-static? static methods are harder to test/mock (unless utils methods)

@pdtran3k6
Copy link
Author

With this implementation, no, bcea

Can we make runTony non-static? static methods are harder to test/mock (unless utils methods)

With this implementation, I don't think so, because main is static, so runTonyAM needs to be static as well. If anything we can just do blackbox testing for runTonyAM, because if you want to do unit test, then you have to un-private many other methods. (monitor, start ,init, etc.)

@pdtran3k6
Copy link
Author

  • I think HistoryFileUtils.createHistoryFile should be part of runTonyAM() since creating the history files is part of the AM lifecycle.
  • Nice refactoring of monitor(). Definitely looks cleaner now.

if (getFinalStatus() == FinalApplicationStatus.FAILED) return;

Please use braces even for one-line if statements.

Ok great! I've updated accordingly with additional minor changes: tony-framework/TonY@c2fe468.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment