Created
April 8, 2016 18:44
-
-
Save hatred/c0be071125f6715e9403f5673de1860f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/launcher/http_command_executor.cpp b/src/launcher/http_command_executor.cpp | |
index 7bcc5bd..eee0182 100644 | |
--- a/src/launcher/http_command_executor.cpp | |
+++ b/src/launcher/http_command_executor.cpp | |
@@ -74,8 +74,14 @@ using std::queue; | |
using std::string; | |
using std::vector; | |
-namespace mesos { | |
-namespace internal { | |
+using process::Clock; | |
+using process::Future; | |
+using process::Owned; | |
+using process::Subprocess; | |
+using process::Timer; | |
+ | |
+using mesos::internal::evolve; | |
+using mesos::internal::TaskHealthStatus; | |
using mesos::v1::CommandInfo; | |
using mesos::v1::ExecutorID; | |
@@ -91,7 +97,9 @@ using mesos::v1::executor::Call; | |
using mesos::v1::executor::Event; | |
using mesos::v1::executor::Mesos; | |
-using namespace process; | |
+namespace mesos { | |
+namespace v1 { | |
+namespace internal { | |
class HttpCommandExecutor: public ProtobufProcess<HttpCommandExecutor> | |
{ | |
@@ -145,6 +153,8 @@ public: | |
Event event = events.front(); | |
events.pop(); | |
+ cout << "Received " << event.type() << " event" << endl; | |
+ | |
switch (event.type()) { | |
case Event::SUBSCRIBED: { | |
cout << "Subscribed executor on " | |
@@ -179,9 +189,13 @@ public: | |
break; | |
} | |
- case Event::MESSAGE: | |
- case Event::ERROR: | |
+ case Event::MESSAGE: { | |
break; | |
+ } | |
+ | |
+ case Event::ERROR: { | |
+ cerr << "Error: " << event.error().message() << endl;; | |
+ } | |
default: { | |
UNREACHABLE(); | |
@@ -219,7 +233,7 @@ protected: | |
cout << "Received task health update, healthy: " | |
<< stringify(healthy) << endl; | |
- sendStatusUpdate(evolve(taskID), TaskState::TASK_RUNNING, healthy); | |
+ update(evolve(taskID), TaskState::TASK_RUNNING, healthy); | |
if (initiateTaskKill) { | |
killedByHealthCheck = true; | |
@@ -261,7 +275,7 @@ protected: | |
CHECK_EQ(SUBSCRIBED, state); | |
if (launched) { | |
- sendStatusUpdate( | |
+ update( | |
_task.task_id(), | |
TaskState::TASK_FAILED, | |
None(), | |
@@ -373,7 +387,7 @@ protected: | |
} | |
rootfs = path::join( | |
- os::getcwd(), slave::COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH); | |
+ os::getcwd(), COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH); | |
string sandbox = path::join(rootfs.get(), sandboxDirectory.get()); | |
if (!os::exists(sandbox)) { | |
@@ -408,7 +422,7 @@ protected: | |
// the recursive mount. | |
Try<Nothing> unmountAll = fs::unmountAll(path::join( | |
sandbox, | |
- slave::COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH)); | |
+ COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH)); | |
if (unmountAll.isError()) { | |
cerr << "Unable to unmount rootfs under mounted sandbox: " | |
<< unmountAll.error() << endl; | |
@@ -579,7 +593,7 @@ protected: | |
process::reap(pid) | |
.onAny(defer(self(), &Self::reaped, pid, lambda::_1)); | |
- sendStatusUpdate(task->task_id(), TaskState::TASK_RUNNING); | |
+ update(task->task_id(), TaskState::TASK_RUNNING); | |
launched = true; | |
} | |
@@ -638,7 +652,7 @@ private: | |
foreach (const FrameworkInfo::Capability& c, | |
frameworkInfo->capabilities()) { | |
if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) { | |
- sendStatusUpdate(taskId.get(), TaskState::TASK_KILLING); | |
+ update(taskId.get(), TaskState::TASK_KILLING); | |
break; | |
} | |
} | |
@@ -717,9 +731,9 @@ private: | |
CHECK_SOME(taskId); | |
if (killed && killedByHealthCheck) { | |
- sendStatusUpdate(taskId.get(), taskState, false, message); | |
+ update(taskId.get(), taskState, false, message); | |
} else { | |
- sendStatusUpdate(taskId.get(), taskState, None(), message); | |
+ update(taskId.get(), taskState, None(), message); | |
} | |
// This is a hack to ensure the message is sent to the | |
@@ -795,7 +809,7 @@ private: | |
<< stringify(healthPid) << endl; | |
} | |
- void sendStatusUpdate( | |
+ void update( | |
const TaskID& taskID, | |
const TaskState& state, | |
const Option<bool>& healthy = None(), | |
@@ -864,6 +878,7 @@ private: | |
}; | |
} // namespace internal { | |
+} // namespace v1 { | |
} // namespace mesos { | |
@@ -917,8 +932,8 @@ public: | |
int main(int argc, char** argv) | |
{ | |
Flags flags; | |
- mesos::v1::FrameworkID frameworkId; | |
- mesos::v1::ExecutorID executorId; | |
+ FrameworkID frameworkId; | |
+ ExecutorID executorId; | |
// Load flags from command line. | |
Try<Nothing> load = flags.load(None(), &argc, &argv); | |
@@ -984,8 +999,8 @@ int main(int argc, char** argv) | |
shutdownGracePeriod = parse.get(); | |
} | |
- process::Owned<mesos::internal::HttpCommandExecutor> executor( | |
- new mesos::internal::HttpCommandExecutor( | |
+ Owned<mesos::v1::internal::HttpCommandExecutor> executor( | |
+ new mesos::v1::internal::HttpCommandExecutor( | |
override, | |
path, | |
flags.sandbox_directory, |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment