Skip to content

Instantly share code, notes, and snippets.

@hatred
Created April 8, 2016 18:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hatred/c0be071125f6715e9403f5673de1860f to your computer and use it in GitHub Desktop.
Save hatred/c0be071125f6715e9403f5673de1860f to your computer and use it in GitHub Desktop.
diff --git a/src/launcher/http_command_executor.cpp b/src/launcher/http_command_executor.cpp
index 7bcc5bd..eee0182 100644
--- a/src/launcher/http_command_executor.cpp
+++ b/src/launcher/http_command_executor.cpp
@@ -74,8 +74,14 @@ using std::queue;
using std::string;
using std::vector;
-namespace mesos {
-namespace internal {
+using process::Clock;
+using process::Future;
+using process::Owned;
+using process::Subprocess;
+using process::Timer;
+
+using mesos::internal::evolve;
+using mesos::internal::TaskHealthStatus;
using mesos::v1::CommandInfo;
using mesos::v1::ExecutorID;
@@ -91,7 +97,9 @@ using mesos::v1::executor::Call;
using mesos::v1::executor::Event;
using mesos::v1::executor::Mesos;
-using namespace process;
+namespace mesos {
+namespace v1 {
+namespace internal {
class HttpCommandExecutor: public ProtobufProcess<HttpCommandExecutor>
{
@@ -145,6 +153,8 @@ public:
Event event = events.front();
events.pop();
+ cout << "Received " << event.type() << " event" << endl;
+
switch (event.type()) {
case Event::SUBSCRIBED: {
cout << "Subscribed executor on "
@@ -179,9 +189,13 @@ public:
break;
}
- case Event::MESSAGE:
- case Event::ERROR:
+ case Event::MESSAGE: {
break;
+ }
+
+ case Event::ERROR: {
+ cerr << "Error: " << event.error().message() << endl;;
+ }
default: {
UNREACHABLE();
@@ -219,7 +233,7 @@ protected:
cout << "Received task health update, healthy: "
<< stringify(healthy) << endl;
- sendStatusUpdate(evolve(taskID), TaskState::TASK_RUNNING, healthy);
+ update(evolve(taskID), TaskState::TASK_RUNNING, healthy);
if (initiateTaskKill) {
killedByHealthCheck = true;
@@ -261,7 +275,7 @@ protected:
CHECK_EQ(SUBSCRIBED, state);
if (launched) {
- sendStatusUpdate(
+ update(
_task.task_id(),
TaskState::TASK_FAILED,
None(),
@@ -373,7 +387,7 @@ protected:
}
rootfs = path::join(
- os::getcwd(), slave::COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH);
+ os::getcwd(), COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH);
string sandbox = path::join(rootfs.get(), sandboxDirectory.get());
if (!os::exists(sandbox)) {
@@ -408,7 +422,7 @@ protected:
// the recursive mount.
Try<Nothing> unmountAll = fs::unmountAll(path::join(
sandbox,
- slave::COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH));
+ COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH));
if (unmountAll.isError()) {
cerr << "Unable to unmount rootfs under mounted sandbox: "
<< unmountAll.error() << endl;
@@ -579,7 +593,7 @@ protected:
process::reap(pid)
.onAny(defer(self(), &Self::reaped, pid, lambda::_1));
- sendStatusUpdate(task->task_id(), TaskState::TASK_RUNNING);
+ update(task->task_id(), TaskState::TASK_RUNNING);
launched = true;
}
@@ -638,7 +652,7 @@ private:
foreach (const FrameworkInfo::Capability& c,
frameworkInfo->capabilities()) {
if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
- sendStatusUpdate(taskId.get(), TaskState::TASK_KILLING);
+ update(taskId.get(), TaskState::TASK_KILLING);
break;
}
}
@@ -717,9 +731,9 @@ private:
CHECK_SOME(taskId);
if (killed && killedByHealthCheck) {
- sendStatusUpdate(taskId.get(), taskState, false, message);
+ update(taskId.get(), taskState, false, message);
} else {
- sendStatusUpdate(taskId.get(), taskState, None(), message);
+ update(taskId.get(), taskState, None(), message);
}
// This is a hack to ensure the message is sent to the
@@ -795,7 +809,7 @@ private:
<< stringify(healthPid) << endl;
}
- void sendStatusUpdate(
+ void update(
const TaskID& taskID,
const TaskState& state,
const Option<bool>& healthy = None(),
@@ -864,6 +878,7 @@ private:
};
} // namespace internal {
+} // namespace v1 {
} // namespace mesos {
@@ -917,8 +932,8 @@ public:
int main(int argc, char** argv)
{
Flags flags;
- mesos::v1::FrameworkID frameworkId;
- mesos::v1::ExecutorID executorId;
+ FrameworkID frameworkId;
+ ExecutorID executorId;
// Load flags from command line.
Try<Nothing> load = flags.load(None(), &argc, &argv);
@@ -984,8 +999,8 @@ int main(int argc, char** argv)
shutdownGracePeriod = parse.get();
}
- process::Owned<mesos::internal::HttpCommandExecutor> executor(
- new mesos::internal::HttpCommandExecutor(
+ Owned<mesos::v1::internal::HttpCommandExecutor> executor(
+ new mesos::v1::internal::HttpCommandExecutor(
override,
path,
flags.sandbox_directory,
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment