Skip to content

Instantly share code, notes, and snippets.

@casualjim
Created January 20, 2020 07:04
Show Gist options
  • Save casualjim/74ac15266f601b6d08ccbea21921af62 to your computer and use it in GitHub Desktop.
Save casualjim/74ac15266f601b6d08ccbea21921af62 to your computer and use it in GitHub Desktop.
use crate::health::pb::health_check_response::ServingStatus;
use crate::health::pb::health_server::{Health, HealthServer};
use crate::health::pb::{HealthCheckRequest, HealthCheckResponse};
use anyhow::Result;
use dashmap::DashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::Error;
use std::fmt::Formatter;
use std::sync::Arc;
use tokio::sync::{mpsc, watch};
use tonic::{Request, Response};
mod pb;
//pub mod pb {
// tonic::include_proto!("grpc.health.v1");
//}
#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
pub enum Status {
Unknown,
Unhealthy,
Healthy,
}
impl From<ServingStatus> for Status {
fn from(st: ServingStatus) -> Self {
match st {
ServingStatus::Unknown => Status::Unknown,
ServingStatus::NotServing => Status::Unhealthy,
ServingStatus::Serving => Status::Healthy,
}
}
}
impl Into<ServingStatus> for Status {
fn into(self) -> ServingStatus {
match self {
Status::Unknown => ServingStatus::Unknown,
Status::Unhealthy => ServingStatus::NotServing,
Status::Healthy => ServingStatus::Serving,
}
}
}
impl Into<i32> for Status {
fn into(self) -> i32 {
match self {
Status::Unknown => ServingStatus::Unknown as i32,
Status::Unhealthy => ServingStatus::NotServing as i32,
Status::Healthy => ServingStatus::Serving as i32,
}
}
}
#[derive(Debug)]
enum ComponentEvent {
New(String, Vec<String>),
StatusUpdate(String, Status),
}
/// `Checks` is a healthcheck broker from internal components to grpc services.
/// It assumes that a grpc service is composed out of one or more components each with a potential
/// of being unhealthy. We want to make sure that if one is unhealthy they all are unhealthy.
/// To do this we make use of tokio watches. When a component registers we make return a watch sender
/// and set up a subscription for a service to that component, which tracks the health of the service overall.
/// Then when a client subscribes we set up a subscription to the service for that client.
///
/// So when a component changes health status from healthy to unhealthy it will call broadcast.
/// The service will receive that update, compare it with its status and if it was healthy it will also
/// turn unhealthy. Then it will broadcast its status to the interested clients.
pub struct Checks {
services: Arc<DashMap<String, watch::Receiver<ServingStatus>>>,
service: mpsc::Sender<ComponentEvent>,
listener: watch::Receiver<ServingStatus>,
}
impl Checks {
pub async fn new() -> Checks {
let (svcup, listener) = watch::channel::<ServingStatus>(ServingStatus::Unknown);
let (service, mut svcmbox) = mpsc::channel::<ComponentEvent>(50);
let mut service_receivers: Arc<DashMap<String, watch::Receiver<ServingStatus>>> = Arc::new(DashMap::new());
let receivers = service_receivers.clone();
tokio::spawn(async move {
let mut current = Status::Unknown;
let mut components: HashMap<String, Status> = HashMap::new();
let mut component_mapping: HashMap<String, HashSet<String>> = HashMap::new();
let mut service_mapping: HashMap<String, HashSet<String>> = HashMap::new();
let mut service_senders: HashMap<String, watch::Sender<ServingStatus>> = HashMap::new();
debug!("starting healthchecker broker loop");
while let Some(event) = svcmbox.recv().await {
match event {
ComponentEvent::New(name, services) => {
let svcs = services.clone();
service_mapping
.entry(name.clone())
.and_modify(|m| {
debug!("register > updating component={} service={:?}", &name, services.clone());
m.extend(services.clone());
})
.or_insert({
debug!(
"register > inserting component={} services={:?}",
&name,
services.clone()
);
services.into_iter().collect()
});
for svc in svcs {
component_mapping
.entry(svc.clone())
.and_modify(|store| {
store.insert(name.clone());
})
.or_insert({
let mut hs = HashSet::new();
hs.insert(name.clone());
hs
});
if !service_senders.contains_key(&svc) {
let (svcsender, svcrecv) = watch::channel::<ServingStatus>(ServingStatus::Unknown);
service_senders.insert(svc.clone(), svcsender);
service_receivers.insert(svc, svcrecv);
}
}
}
ComponentEvent::StatusUpdate(name, status) => {
debug!("got a message in the broker loop");
let prev = components.insert(name.clone(), status);
debug!(
"comparing current={:?} prev={:?} update={:?} known={:?}",
current, prev, status, components
);
if let Some(svc_names) = component_mapping.get(&name) {
for svc_name in svc_names {
let prev_svc = service_receivers
.get(svc_name)
.map(|v| *v.value().borrow())
.unwrap_or(ServingStatus::Unknown);
service_mapping
.get(svc_name)
.filter(|c| match prev_svc {
ServingStatus::Serving => {
prev.is_none()
|| c
.iter()
.any(|s| components.get(s).filter(|&st| *st != Status::Healthy).is_some())
}
_ => c
.iter()
.all(|s| components.get(s).filter(|&st| *st == Status::Healthy).is_some()),
})
.and(
service_senders
.get(svc_name)
.map(|sender| sender.broadcast(status.into()).ok()),
);
}
}
let has_changed = match current {
Status::Healthy => prev.is_none() || components.iter().any(|(_, st)| *st != Status::Healthy),
_ => components.iter().all(|(_, st)| *st == Status::Healthy),
};
debug!(
"this changes the services: is_change={} status={:?}",
has_changed, status
);
if has_changed {
current = status;
debug!("detected a change, forwarding");
if svcup.broadcast(status.into()).is_err() {
break;
}
}
}
}
}
debug!("leaving healthchecker broker loop");
});
Checks {
services: receivers,
service,
listener,
}
}
async fn register<V>(&self, component: impl Into<String>, services: V) -> Result<watch::Sender<Status>>
where
V: IntoIterator,
V::Item: Into<String>,
{
let (cmpaddr, mut cmpmbx) = watch::channel::<Status>(Status::Unknown);
let mut sender = self.service.clone();
let name = component.into();
sender
.send(ComponentEvent::New(
name.clone(),
services.into_iter().map(|v| v.into()).collect(),
))
.await
.map_err(|e| anyhow!("unable to register component"))?;
tokio::spawn(async move {
while let Some(status) = cmpmbx.recv().await {
match sender.send(ComponentEvent::StatusUpdate(name.clone(), status)).await {
Err(_) => break,
_ => {}
}
}
});
Ok(cmpaddr)
}
}
#[tonic::async_trait]
impl Health for Checks {
async fn check(&self, request: Request<HealthCheckRequest>) -> Result<Response<HealthCheckResponse>, tonic::Status> {
let svc = &request.get_ref().service;
if svc.is_empty() {
return Ok(Response::new(HealthCheckResponse {
status: *self.listener.borrow() as i32,
}));
}
if let Some((_, receiver)) = self.services.get(svc) {
return Ok(Response::new(HealthCheckResponse {
status: *receiver.borrow() as i32,
}));
}
Err(tonic::Status::not_found(format!("service {} not found", svc)))
}
type WatchStream = watch::Receiver<Result<HealthCheckResponse, tonic::Status>>;
async fn watch(&self, request: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>, tonic::Status> {
// let service: &str = &request.get_ref().service;
// TODO: do lookup of service names
let (sub_tx, sub_rx) =
watch::channel::<Result<HealthCheckResponse, tonic::Status>>(Ok(HealthCheckResponse { status: 0 }));
let mut svc = self.listener.clone();
tokio::spawn(async move {
info!("got a subscription to the service");
while let Some(status) = svc.recv().await {
debug!("sending status update to subscription");
let res = Ok(HealthCheckResponse { status: status.into() });
if sub_tx.broadcast(res).is_err() {
break;
}
}
info!("closed subscription to the service");
});
Ok(Response::new(sub_rx))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::mpsc;
#[tokio::test(threaded_scheduler)]
async fn registers_a_component() {
crate::tests::init();
let (address, mut mbox) = mpsc::unbounded_channel::<ServingStatus>();
let mut checks = Checks::new().await;
let mut svc = checks.listener.clone();
let _jh1 = tokio::spawn(async move {
while let Some(msg) = svc.recv().await {
address.send(msg).unwrap();
}
});
let actual = mbox.recv().await;
assert_eq!(Some(ServingStatus::Unknown), actual);
// let reporter = checks.register("global", "kafkalog");
let mut bus = checks.service.clone();
bus
.send(ComponentEvent::StatusUpdate("global".to_string(), Status::Healthy))
.await
.unwrap();
let actual = mbox.recv().await;
assert_eq!(Some(ServingStatus::Serving), actual);
let component = checks.register("kafkalog", vec!["global"]).await.unwrap();
assert!(component.broadcast(Status::Unhealthy).is_ok());
assert_eq!(Some(ServingStatus::NotServing), mbox.recv().await);
let r = checks
.check(Request::new(HealthCheckRequest {
service: "".to_string(),
}))
.await;
assert!(r.is_ok());
assert_eq!(ServingStatus::NotServing as i32, r.unwrap().get_ref().status)
}
#[tokio::test]
async fn monitors_health_of_many_services_with_many_components() {
crate::tests::init();
let (address, mut mbox) = mpsc::unbounded_channel::<ServingStatus>();
let mut checks = Checks::new().await;
let mut svc = checks.listener.clone();
let _jh1 = tokio::spawn(async move {
while let Some(msg) = svc.recv().await {
address.send(msg).unwrap();
}
});
let actual = mbox.recv().await;
assert_eq!(Some(ServingStatus::Unknown), actual);
let mut bus = checks.service.clone();
bus
.send(ComponentEvent::StatusUpdate("global".to_string(), Status::Healthy))
.await
.unwrap();
let actual = mbox.recv().await;
assert_eq!(Some(ServingStatus::Serving), actual);
let component = checks.register("kafkalog", vec!["global"]).await.unwrap();
assert!(component.broadcast(Status::Unhealthy).is_ok());
assert_eq!(Some(ServingStatus::NotServing), mbox.recv().await);
let c2 = checks.register("flows", vec!["global"]).await.unwrap();
assert_eq!(ServingStatus::NotServing, *checks.listener.borrow());
assert!(component.broadcast(Status::Healthy).is_ok());
assert!(c2.broadcast(Status::Healthy).is_ok());
assert_eq!(Some(ServingStatus::Serving), mbox.recv().await);
assert!(c2.broadcast(Status::Unhealthy).is_ok());
assert_eq!(Some(ServingStatus::NotServing), mbox.recv().await);
let r = checks
.check(Request::new(HealthCheckRequest {
service: "global".to_string(),
}))
.await;
assert!(r.is_ok());
assert_eq!(ServingStatus::NotServing as i32, r.unwrap().get_ref().status)
}
#[test]
fn this_helps_clion() {
// this can be removed when clion detects the async test as a test
// until then this helps for using shortcuts to run the tests for this module
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment