Skip to content

Instantly share code, notes, and snippets.

@Object905
Created March 14, 2024 10:51
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Object905/6cafd5e8e56dd60670149296411a407f to your computer and use it in GitHub Desktop.
Save Object905/6cafd5e8e56dd60670149296411a407f to your computer and use it in GitHub Desktop.
Pingora kubernetes discovery example
use std::collections::{BTreeSet, HashMap};
use std::net::{IpAddr, Ipv4Addr, SocketAddrV4};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use async_trait::async_trait;
use atomic_option::AtomicOption;
use futures::future::{select, Either};
use futures::{FutureExt, Stream, StreamExt};
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::runtime::reflector::Store;
use pingora::prelude::{Error, Result as PingoraResult};
use pingora_core::protocols::l4::socket::SocketAddr;
use pingora_core::server::ShutdownWatch;
use pingora_core::services::background::BackgroundService;
use pingora_load_balancing::discovery::ServiceDiscovery;
use pingora_load_balancing::selection::RoundRobin;
use pingora_load_balancing::{Backend, LoadBalancer};
pub struct EndpointSliceDiscovery {
pub ctx: Arc<ProxyContext>,
pub store: Arc<Store<EndpointSlice>>,
pub port: u16,
}
impl EndpointSliceDiscovery {
fn kube_to_backends(&self, kube: Arc<EndpointSlice>) -> PingoraResult<Vec<Backend>> {
if kube.address_type != "IPv4" {
return Err(Error::create(
pingora_core::ErrorType::Custom("Can handle only IPv4 from endpoin slices"),
pingora_core::ErrorSource::Internal,
None,
None,
));
}
let ips: Vec<Ipv4Addr> = kube
.endpoints
.iter()
.filter(|&e| {
if let Some(conditions) = e.conditions.as_ref() {
if let Some(ready) = conditions.ready {
if ready {
return true;
}
}
}
false
})
.map(|endpoint| {
endpoint.addresses.iter().map(|ip| {
Ipv4Addr::from_str(ip).expect("Failed to parse endpoint IPv4 address")
})
})
.flatten()
.collect();
let backends: Vec<Backend> = ips
.into_iter()
.map(|ip| SocketAddr::Inet(std::net::SocketAddr::V4(SocketAddrV4::new(ip, self.port))))
.map(|addr| Backend {
addr: addr,
weight: 1,
})
.collect();
return Ok(backends);
}
}
#[async_trait]
impl ServiceDiscovery for EndpointSliceDiscovery {
async fn discover(&self) -> PingoraResult<(BTreeSet<Backend>, HashMap<u64, bool>)> {
let state = self.store.state();
let backends: PingoraResult<Vec<Vec<Backend>>> = state
.into_iter()
.map(|kube| self.kube_to_backends(kube))
.collect();
let backends = backends?;
let result: BTreeSet<Backend> = backends.into_iter().flatten().collect();
Ok((result, HashMap::new()))
}
}
pub struct EndpointSliceDiscoveryService {
pub stream: AtomicOption<Pin<Box<dyn Stream<Item = EndpointSlice> + Send>>>,
pub lb: Arc<LoadBalancer<RoundRobin>>,
}
impl EndpointSliceDiscoveryService {
async fn update_backends(&self, _event: EndpointSlice) {
self.lb.update().await.expect("Failed to update backends");
}
}
#[async_trait]
impl BackgroundService for EndpointSliceDiscoveryService {
async fn start(&self, mut _shutdown: ShutdownWatch) {
let owned_stream = *self.stream.take(Ordering::SeqCst).unwrap();
let stream = owned_stream
.for_each(|_event| async {
self.update_backends(_event).await;
()
})
.boxed();
let completed = select(stream, _shutdown.changed().boxed()).await;
match completed {
Either::Left(_) => {
panic!("Kubernetes update stream ended abruptly");
},
Either::Right(_) => {
// terminating
},
}
}
}
let kube_client = self.ctx.kube();
if kube_client.is_none() || self.lb.is_some() {
return;
}
let endpoint_slices =
Api::<EndpointSlice>::namespaced(kube_client.unwrap().clone(), "default");
let filter = Config::default()
.labels(format!("kubernetes.io/service-name={}", self.service_name.as_str()).as_str());
let (store, writer) = reflector::store();
let rf = reflector(writer, watcher(endpoint_slices, filter));
let store = Arc::new(store);
let kube_discovery = EndpointSliceDiscovery {
ctx: self.ctx.clone(),
store: store.clone(),
port: self.port,
};
let kube_backends = Backends::new(Box::new(kube_discovery));
let mut kube_upstream: LoadBalancer<RoundRobin> =
LoadBalancer::from_backends(kube_backends);
let hc = health_check::TcpHealthCheck::new();
kube_upstream.set_health_check(hc);
kube_upstream.health_check_frequency = Some(Duration::from_secs(5));
let background = background_service(&format!("TcpHealthCheck {}", self), kube_upstream);
let upstream = background.task();
server.add_service(background);
let kube_upd_service = EndpointSliceDiscoveryService {
stream: AtomicOption::new(Box::new(
rf.applied_objects().filter_map(|e| ready(e.ok())).boxed(),
)),
lb: upstream.clone(),
};
server.add_service(background_service(
&format!("Reflector updater {}", self),
kube_upd_service,
));
self.lb = Some(upstream);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment