Created
May 22, 2018 20:55
-
-
Save rukletsov/c079d95479fb134d137ea3ae8b7ae874 to your computer and use it in GitHub Desktop.
A test that ensures health updates are delivered via Mesos streaming API.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
TEST_P(MasterAPITest, SubscribeHealthUpdates) | |
{ | |
ContentType contentType = GetParam(); | |
Try<Owned<cluster::Master>> master = this->StartMaster(); | |
ASSERT_SOME(master); | |
auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); | |
Owned<MasterDetector> detector = master.get()->createDetector(); | |
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); | |
ASSERT_SOME(slave); | |
Future<Nothing> connected; | |
EXPECT_CALL(*scheduler, connected(_)) | |
.WillOnce(FutureSatisfy(&connected)); | |
v1::scheduler::TestMesos mesos( | |
master.get()->pid, | |
contentType, | |
scheduler); | |
AWAIT_READY(connected); | |
Future<v1::scheduler::Event::Subscribed> subscribed; | |
EXPECT_CALL(*scheduler, subscribed(_, _)) | |
.WillOnce(FutureArg<1>(&subscribed)); | |
EXPECT_CALL(*scheduler, heartbeat(_)) | |
.WillRepeatedly(Return()); // Ignore heartbeats. | |
Future<v1::scheduler::Event::Offers> offers; | |
EXPECT_CALL(*scheduler, offers(_, _)) | |
.WillOnce(FutureArg<1>(&offers)); | |
{ | |
v1::scheduler::Call call; | |
call.set_type(v1::scheduler::Call::SUBSCRIBE); | |
v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe(); | |
subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); | |
mesos.send(call); | |
} | |
AWAIT_READY(subscribed); | |
// Launch a task using the scheduler. This should result in a `TASK_ADDED` | |
// event when the task is launched followed by a `TASK_UPDATED` event after | |
// the task transitions to running state. | |
v1::FrameworkID frameworkId(subscribed->framework_id()); | |
AWAIT_READY(offers); | |
ASSERT_FALSE(offers->offers().empty()); | |
// Create event stream after seeing first offer but before first task is | |
// launched. We should see one framework, one agent and zero task/executor. | |
v1::master::Call v1Call; | |
v1Call.set_type(v1::master::Call::SUBSCRIBE); | |
http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL); | |
headers["Accept"] = stringify(contentType); | |
Future<http::Response> response = http::streaming::post( | |
master.get()->pid, | |
"api/v1", | |
headers, | |
serialize(contentType, v1Call), | |
stringify(contentType)); | |
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response); | |
AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response); | |
ASSERT_EQ(http::Response::PIPE, response->type); | |
ASSERT_SOME(response->reader); | |
http::Pipe::Reader reader = response->reader.get(); | |
auto deserializer = | |
lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1); | |
Reader<v1::master::Event> decoder( | |
Decoder<v1::master::Event>(deserializer), reader); | |
Future<Result<v1::master::Event>> event; | |
{ | |
Future<Result<v1::master::Event>> event = decoder.read(); | |
AWAIT_READY(event); | |
EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type()); | |
const v1::master::Response::GetState& getState = | |
event->get().subscribed().get_state(); | |
EXPECT_EQ(1u, getState.get_frameworks().frameworks_size()); | |
EXPECT_EQ(1u, getState.get_agents().agents_size()); | |
EXPECT_TRUE(getState.get_tasks().tasks().empty()); | |
EXPECT_TRUE(getState.get_executors().executors().empty()); | |
} | |
{ | |
Future<Result<v1::master::Event>> event = decoder.read(); | |
AWAIT_READY(event); | |
EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type()); | |
} | |
event = decoder.read(); | |
EXPECT_TRUE(event.isPending()); | |
const v1::Offer& offer = offers->offers(0); | |
const v1::AgentID agentId(offer.agent_id()); | |
TaskInfo task = createTask(devolve(offer), SLEEP_COMMAND(10000)); | |
HealthCheck healthCheck; | |
healthCheck.set_type(HealthCheck::HTTP); | |
healthCheck.mutable_http()->set_port(80); | |
healthCheck.mutable_http()->set_path("/help"); | |
healthCheck.set_delay_seconds(0); | |
healthCheck.set_interval_seconds(1000); | |
healthCheck.set_grace_period_seconds(0); | |
task.mutable_health_check()->CopyFrom(healthCheck); | |
EXPECT_CALL(*scheduler, update(_, _)) | |
.WillRepeatedly(v1::scheduler::SendAcknowledge(frameworkId, agentId)); | |
{ | |
v1::scheduler::Call call; | |
call.set_type(v1::scheduler::Call::ACCEPT); | |
call.mutable_framework_id()->CopyFrom(frameworkId); | |
v1::scheduler::Call::Accept* accept = call.mutable_accept(); | |
accept->add_offer_ids()->CopyFrom(offer.id()); | |
v1::Offer::Operation* operation = accept->add_operations(); | |
operation->set_type(v1::Offer::Operation::LAUNCH); | |
operation->mutable_launch()->add_task_infos()->CopyFrom(evolve(task)); | |
mesos.send(call); | |
} | |
AWAIT_READY(event); | |
ASSERT_EQ(v1::master::Event::TASK_ADDED, event->get().type()); | |
ASSERT_EQ(evolve(task.task_id()), | |
event->get().task_added().task().task_id()); | |
{ | |
event = decoder.read(); | |
AWAIT_READY(event); | |
ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type()); | |
ASSERT_EQ(v1::TASK_STARTING, | |
event->get().task_updated().status().state()); | |
ASSERT_EQ(evolve(task.task_id()), | |
event->get().task_updated().status().task_id()); | |
} | |
{ | |
event = decoder.read(); | |
AWAIT_READY(event); | |
ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type()); | |
ASSERT_EQ(v1::TASK_RUNNING, | |
event->get().task_updated().state()); | |
ASSERT_EQ(v1::TASK_RUNNING, | |
event->get().task_updated().status().state()); | |
} | |
{ | |
event = decoder.read(); | |
AWAIT_READY(event); | |
ASSERT_EQ(v1::master::Event::TASK_UPDATED, event->get().type()); | |
ASSERT_EQ(v1::TASK_RUNNING, | |
event->get().task_updated().state()); | |
ASSERT_EQ(v1::TASK_RUNNING, | |
event->get().task_updated().status().state()); | |
ASSERT_EQ(v1::TaskStatus::REASON_TASK_HEALTH_CHECK_STATUS_UPDATED, | |
event->get().task_updated().status().reason()); | |
ASSERT_EQ(false, | |
event->get().task_updated().status().healthy()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment