Skip to content

Instantly share code, notes, and snippets.

@rukletsov
Created May 22, 2018 20:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rukletsov/c079d95479fb134d137ea3ae8b7ae874 to your computer and use it in GitHub Desktop.
Save rukletsov/c079d95479fb134d137ea3ae8b7ae874 to your computer and use it in GitHub Desktop.
A test that ensures health updates are delivered via Mesos streaming API.
TEST_P(MasterAPITest, SubscribeHealthUpdates)
{
ContentType contentType = GetParam();
Try<Owned<cluster::Master>> master = this->StartMaster();
ASSERT_SOME(master);
auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
ASSERT_SOME(slave);
Future<Nothing> connected;
EXPECT_CALL(*scheduler, connected(_))
.WillOnce(FutureSatisfy(&connected));
v1::scheduler::TestMesos mesos(
master.get()->pid,
contentType,
scheduler);
AWAIT_READY(connected);
Future<v1::scheduler::Event::Subscribed> subscribed;
EXPECT_CALL(*scheduler, subscribed(_, _))
.WillOnce(FutureArg<1>(&subscribed));
EXPECT_CALL(*scheduler, heartbeat(_))
.WillRepeatedly(Return()); // Ignore heartbeats.
Future<v1::scheduler::Event::Offers> offers;
EXPECT_CALL(*scheduler, offers(_, _))
.WillOnce(FutureArg<1>(&offers));
{
v1::scheduler::Call call;
call.set_type(v1::scheduler::Call::SUBSCRIBE);
v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe();
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO);
mesos.send(call);
}
AWAIT_READY(subscribed);
// Launch a task using the scheduler. This should result in a `TASK_ADDED`
// event when the task is launched followed by a `TASK_UPDATED` event after
// the task transitions to running state.
v1::FrameworkID frameworkId(subscribed->framework_id());
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
// Create event stream after seeing first offer but before first task is
// launched. We should see one framework, one agent and zero task/executor.
v1::master::Call v1Call;
v1Call.set_type(v1::master::Call::SUBSCRIBE);
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
headers["Accept"] = stringify(contentType);
Future<http::Response> response = http::streaming::post(
master.get()->pid,
"api/v1",
headers,
serialize(contentType, v1Call),
stringify(contentType));
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
ASSERT_EQ(http::Response::PIPE, response->type);
ASSERT_SOME(response->reader);
http::Pipe::Reader reader = response->reader.get();
auto deserializer =
lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
Reader<v1::master::Event> decoder(
Decoder<v1::master::Event>(deserializer), reader);
Future<Result<v1::master::Event>> event;
{
Future<Result<v1::master::Event>> event = decoder.read();
AWAIT_READY(event);
EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
const v1::master::Response::GetState& getState =
event->get().subscribed().get_state();
EXPECT_EQ(1u, getState.get_frameworks().frameworks_size());
EXPECT_EQ(1u, getState.get_agents().agents_size());
EXPECT_TRUE(getState.get_tasks().tasks().empty());
EXPECT_TRUE(getState.get_executors().executors().empty());
}
{
Future<Result<v1::master::Event>> event = decoder.read();
AWAIT_READY(event);
EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
}
event = decoder.read();
EXPECT_TRUE(event.isPending());
const v1::Offer& offer = offers->offers(0);
const v1::AgentID agentId(offer.agent_id());
TaskInfo task = createTask(devolve(offer), SLEEP_COMMAND(10000));
HealthCheck healthCheck;
healthCheck.set_type(HealthCheck::HTTP);
healthCheck.mutable_http()->set_port(80);
healthCheck.mutable_http()->set_path("/help");
healthCheck.set_delay_seconds(0);
healthCheck.set_interval_seconds(1000);
healthCheck.set_grace_period_seconds(0);
task.mutable_health_check()->CopyFrom(healthCheck);
EXPECT_CALL(*scheduler, update(_, _))
.WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId));
{
v1::scheduler::Call call;
call.set_type(v1::scheduler::Call::ACCEPT);
call.mutable_framework_id()->CopyFrom(frameworkId);
v1::scheduler::Call::Accept* accept = call.mutable_accept();
accept->add_offer_ids()->CopyFrom(offer.id());
v1::Offer::Operation* operation = accept->add_operations();
operation->set_type(v1::Offer::Operation::LAUNCH);
operation->mutable_launch()->add_task_infos()->CopyFrom(evolve(task));
mesos.send(call);
}
AWAIT_READY(event);
ASSERT_EQ(v1::master::Event::TASK_ADDED, event->get().type());
ASSERT_EQ(evolve(task.task_id()),
event->get().task_added().task().task_id());
{
event = decoder.read();
AWAIT_READY(event);
ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
ASSERT_EQ(v1::TASK_STARTING,
event->get().task_updated().status().state());
ASSERT_EQ(evolve(task.task_id()),
event->get().task_updated().status().task_id());
}
{
event = decoder.read();
AWAIT_READY(event);
ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
ASSERT_EQ(v1::TASK_RUNNING,
event->get().task_updated().state());
ASSERT_EQ(v1::TASK_RUNNING,
event->get().task_updated().status().state());
}
{
event = decoder.read();
AWAIT_READY(event);
ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type());
ASSERT_EQ(v1::TASK_RUNNING,
event->get().task_updated().state());
ASSERT_EQ(v1::TASK_RUNNING,
event->get().task_updated().status().state());
ASSERT_EQ(v1::TaskStatus::REASON_TASK_HEALTH_CHECK_STATUS_UPDATED,
event->get().task_updated().status().reason());
ASSERT_EQ(false,
event->get().task_updated().status().healthy());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment