Skip to content

Instantly share code, notes, and snippets.

@boxdot
Last active February 2, 2018 16:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save boxdot/97f89585a10b3851f48c0d4b6171a5c3 to your computer and use it in GitHub Desktop.
Save boxdot/97f89585a10b3851f48c0d4b6171a5c3 to your computer and use it in GitHub Desktop.
impl Client {
pub fn connect(
handle: &Handle,
uri: hyper::Uri,
framework_info: mesos::FrameworkInfo,
) -> Box<Future<Item = Self, Error = failure::Error>> {
// Mesos subscribe essage
let mut call = scheduler::Call::new();
let mut subscribe = scheduler::Call_Subscribe::new();
subscribe.set_framework_info(framework_info);
call.set_subscribe(subscribe);
call.set_field_type(scheduler::Call_Type::SUBSCRIBE);
// Build request
let mut request = hyper::Request::new(hyper::Method::Post, uri);
// TODO: move out of body
let protobuf_media_type = "application/x-protobuf".parse::<mime::Mime>().unwrap();
request.headers_mut().set(hyper::header::Accept(vec![
hyper::header::qitem(protobuf_media_type.clone()),
]));
request
.headers_mut()
.set(hyper::header::ContentType(protobuf_media_type));
// TODO: Handle error
let body = call.write_to_bytes().unwrap();
request.set_body(body);
// Call Mesos
let http_client = hyper::Client::new(&handle);
let client = http_client
.request(request)
.map_err(failure::Error::from)
.and_then(
move |res: hyper::Response| -> Box<Future<Item = _, Error = _>> {
println!("Response status: {}", res.status());
let stream_id = if let Some(header) = res.headers().get::<MesosStreamIdHeader>()
{
header.clone()
} else {
return Box::new(future::err(format_err!("Missing header")));
};
let events = Events::new(res.body());
let events = events
.into_future()
.map_err(|(err, _)| failure::Error::from(err))
.map(|(event, stream)| (stream_id, event, stream));
Box::new(events)
},
)
.and_then(|(stream_id, event, stream)| {
let event = event.unwrap();
let framework_id = event.get_subscribed().get_framework_id();
Ok(Self {
framework_id: framework_id.get_value().into(),
stream_id: stream_id.as_str().into(),
events: stream,
})
});
Box::new(client)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment