Skip to content

Instantly share code, notes, and snippets.

Created January 20, 2020 07:04
Show Gist options
  • Save casualjim/74ac15266f601b6d08ccbea21921af62 to your computer and use it in GitHub Desktop.
Save casualjim/74ac15266f601b6d08ccbea21921af62 to your computer and use it in GitHub Desktop.
use crate::health::pb::health_check_response::ServingStatus;
use crate::health::pb::health_server::{Health, HealthServer};
use crate::health::pb::{HealthCheckRequest, HealthCheckResponse};
use anyhow::Result;
use dashmap::DashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::Error;
use std::fmt::Formatter;
use std::sync::Arc;
use tokio::sync::{mpsc, watch};
use tonic::{Request, Response};
mod pb;
//pub mod pb {
// tonic::include_proto!("");
#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
pub enum Status {
impl From<ServingStatus> for Status {
fn from(st: ServingStatus) -> Self {
match st {
ServingStatus::Unknown => Status::Unknown,
ServingStatus::NotServing => Status::Unhealthy,
ServingStatus::Serving => Status::Healthy,
impl Into<ServingStatus> for Status {
fn into(self) -> ServingStatus {
match self {
Status::Unknown => ServingStatus::Unknown,
Status::Unhealthy => ServingStatus::NotServing,
Status::Healthy => ServingStatus::Serving,
impl Into<i32> for Status {
fn into(self) -> i32 {
match self {
Status::Unknown => ServingStatus::Unknown as i32,
Status::Unhealthy => ServingStatus::NotServing as i32,
Status::Healthy => ServingStatus::Serving as i32,
enum ComponentEvent {
New(String, Vec<String>),
StatusUpdate(String, Status),
/// `Checks` is a healthcheck broker from internal components to grpc services.
/// It assumes that a grpc service is composed out of one or more components each with a potential
/// of being unhealthy. We want to make sure that if one is unhealthy they all are unhealthy.
/// To do this we make use of tokio watches. When a component registers we make return a watch sender
/// and set up a subscription for a service to that component, which tracks the health of the service overall.
/// Then when a client subscribes we set up a subscription to the service for that client.
/// So when a component changes health status from healthy to unhealthy it will call broadcast.
/// The service will receive that update, compare it with its status and if it was healthy it will also
/// turn unhealthy. Then it will broadcast its status to the interested clients.
pub struct Checks {
services: Arc<DashMap<String, watch::Receiver<ServingStatus>>>,
service: mpsc::Sender<ComponentEvent>,
listener: watch::Receiver<ServingStatus>,
impl Checks {
pub async fn new() -> Checks {
let (svcup, listener) = watch::channel::<ServingStatus>(ServingStatus::Unknown);
let (service, mut svcmbox) = mpsc::channel::<ComponentEvent>(50);
let mut service_receivers: Arc<DashMap<String, watch::Receiver<ServingStatus>>> = Arc::new(DashMap::new());
let receivers = service_receivers.clone();
tokio::spawn(async move {
let mut current = Status::Unknown;
let mut components: HashMap<String, Status> = HashMap::new();
let mut component_mapping: HashMap<String, HashSet<String>> = HashMap::new();
let mut service_mapping: HashMap<String, HashSet<String>> = HashMap::new();
let mut service_senders: HashMap<String, watch::Sender<ServingStatus>> = HashMap::new();
debug!("starting healthchecker broker loop");
while let Some(event) = svcmbox.recv().await {
match event {
ComponentEvent::New(name, services) => {
let svcs = services.clone();
.and_modify(|m| {
debug!("register > updating component={} service={:?}", &name, services.clone());
"register > inserting component={} services={:?}",
for svc in svcs {
.and_modify(|store| {
let mut hs = HashSet::new();
if !service_senders.contains_key(&svc) {
let (svcsender, svcrecv) = watch::channel::<ServingStatus>(ServingStatus::Unknown);
service_senders.insert(svc.clone(), svcsender);
service_receivers.insert(svc, svcrecv);
ComponentEvent::StatusUpdate(name, status) => {
debug!("got a message in the broker loop");
let prev = components.insert(name.clone(), status);
"comparing current={:?} prev={:?} update={:?} known={:?}",
current, prev, status, components
if let Some(svc_names) = component_mapping.get(&name) {
for svc_name in svc_names {
let prev_svc = service_receivers
.map(|v| *v.value().borrow())
.filter(|c| match prev_svc {
ServingStatus::Serving => {
|| c
.any(|s| components.get(s).filter(|&st| *st != Status::Healthy).is_some())
_ => c
.all(|s| components.get(s).filter(|&st| *st == Status::Healthy).is_some()),
.map(|sender| sender.broadcast(status.into()).ok()),
let has_changed = match current {
Status::Healthy => prev.is_none() || components.iter().any(|(_, st)| *st != Status::Healthy),
_ => components.iter().all(|(_, st)| *st == Status::Healthy),
"this changes the services: is_change={} status={:?}",
has_changed, status
if has_changed {
current = status;
debug!("detected a change, forwarding");
if svcup.broadcast(status.into()).is_err() {
debug!("leaving healthchecker broker loop");
Checks {
services: receivers,
async fn register<V>(&self, component: impl Into<String>, services: V) -> Result<watch::Sender<Status>>
V: IntoIterator,
V::Item: Into<String>,
let (cmpaddr, mut cmpmbx) = watch::channel::<Status>(Status::Unknown);
let mut sender = self.service.clone();
let name = component.into();
services.into_iter().map(|v| v.into()).collect(),
.map_err(|e| anyhow!("unable to register component"))?;
tokio::spawn(async move {
while let Some(status) = cmpmbx.recv().await {
match sender.send(ComponentEvent::StatusUpdate(name.clone(), status)).await {
Err(_) => break,
_ => {}
impl Health for Checks {
async fn check(&self, request: Request<HealthCheckRequest>) -> Result<Response<HealthCheckResponse>, tonic::Status> {
let svc = &request.get_ref().service;
if svc.is_empty() {
return Ok(Response::new(HealthCheckResponse {
status: *self.listener.borrow() as i32,
if let Some((_, receiver)) = {
return Ok(Response::new(HealthCheckResponse {
status: *receiver.borrow() as i32,
Err(tonic::Status::not_found(format!("service {} not found", svc)))
type WatchStream = watch::Receiver<Result<HealthCheckResponse, tonic::Status>>;
async fn watch(&self, request: Request<HealthCheckRequest>) -> Result<Response<Self::WatchStream>, tonic::Status> {
// let service: &str = &request.get_ref().service;
// TODO: do lookup of service names
let (sub_tx, sub_rx) =
watch::channel::<Result<HealthCheckResponse, tonic::Status>>(Ok(HealthCheckResponse { status: 0 }));
let mut svc = self.listener.clone();
tokio::spawn(async move {
info!("got a subscription to the service");
while let Some(status) = svc.recv().await {
debug!("sending status update to subscription");
let res = Ok(HealthCheckResponse { status: status.into() });
if sub_tx.broadcast(res).is_err() {
info!("closed subscription to the service");
mod tests {
use super::*;
use tokio::sync::mpsc;
async fn registers_a_component() {
let (address, mut mbox) = mpsc::unbounded_channel::<ServingStatus>();
let mut checks = Checks::new().await;
let mut svc = checks.listener.clone();
let _jh1 = tokio::spawn(async move {
while let Some(msg) = svc.recv().await {
let actual = mbox.recv().await;
assert_eq!(Some(ServingStatus::Unknown), actual);
// let reporter = checks.register("global", "kafkalog");
let mut bus = checks.service.clone();
.send(ComponentEvent::StatusUpdate("global".to_string(), Status::Healthy))
let actual = mbox.recv().await;
assert_eq!(Some(ServingStatus::Serving), actual);
let component = checks.register("kafkalog", vec!["global"]).await.unwrap();
assert_eq!(Some(ServingStatus::NotServing), mbox.recv().await);
let r = checks
.check(Request::new(HealthCheckRequest {
service: "".to_string(),
assert_eq!(ServingStatus::NotServing as i32, r.unwrap().get_ref().status)
async fn monitors_health_of_many_services_with_many_components() {
let (address, mut mbox) = mpsc::unbounded_channel::<ServingStatus>();
let mut checks = Checks::new().await;
let mut svc = checks.listener.clone();
let _jh1 = tokio::spawn(async move {
while let Some(msg) = svc.recv().await {
let actual = mbox.recv().await;
assert_eq!(Some(ServingStatus::Unknown), actual);
let mut bus = checks.service.clone();
.send(ComponentEvent::StatusUpdate("global".to_string(), Status::Healthy))
let actual = mbox.recv().await;
assert_eq!(Some(ServingStatus::Serving), actual);
let component = checks.register("kafkalog", vec!["global"]).await.unwrap();
assert_eq!(Some(ServingStatus::NotServing), mbox.recv().await);
let c2 = checks.register("flows", vec!["global"]).await.unwrap();
assert_eq!(ServingStatus::NotServing, *checks.listener.borrow());
assert_eq!(Some(ServingStatus::Serving), mbox.recv().await);
assert_eq!(Some(ServingStatus::NotServing), mbox.recv().await);
let r = checks
.check(Request::new(HealthCheckRequest {
service: "global".to_string(),
assert_eq!(ServingStatus::NotServing as i32, r.unwrap().get_ref().status)
fn this_helps_clion() {
// this can be removed when clion detects the async test as a test
// until then this helps for using shortcuts to run the tests for this module
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment