Skip to content

Instantly share code, notes, and snippets.

@Object905
Last active October 28, 2024 16:23
Show Gist options
  • Save Object905/6cafd5e8e56dd60670149296411a407f to your computer and use it in GitHub Desktop.
Save Object905/6cafd5e8e56dd60670149296411a407f to your computer and use it in GitHub Desktop.
Pingora kubernetes/DNS ServiceDiscovery
use async_trait::async_trait;
use hickory_resolver::TokioAsyncResolver;
use http::Extensions;
use pingora::lb::discovery::ServiceDiscovery;
use pingora::lb::selection::{BackendIter, BackendSelection};
use pingora::lb::{Backend, Backends, LoadBalancer};
use pingora::protocols::l4::socket::SocketAddr;
use pingora::{Error, ErrorSource, ErrorType, Result as PingoraResult};
use std::fmt::Debug;
use std::net::{IpAddr, SocketAddrV4};
use std::{
collections::{BTreeSet, HashMap},
sync::Arc,
};
/// Service discovery that resolves domains to Backends with DNS lookup using `hickory_resolver` crate.
///
/// Only IPv4 addresses are used, IPv6 ignored silently.
#[derive(Debug, Clone)]
pub struct DnsDiscovery {
/// Domain that will be resolved
pub domain: String,
// Port used for Backend
pub port: u16,
/// Resolver from `hickory_resolver`
pub resolver: Arc<TokioAsyncResolver>,
/// Extensions that will be set to backends
pub extensions: Option<Extensions>,
}
impl DnsDiscovery {
pub fn new<D: Into<String>>(domain: D, port: u16, resolver: Arc<TokioAsyncResolver>) -> Self {
DnsDiscovery {
domain: domain.into(),
port,
resolver,
extensions: None,
}
}
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = Some(extensions);
self
}
}
#[async_trait]
impl ServiceDiscovery for DnsDiscovery {
async fn discover(&self) -> PingoraResult<(BTreeSet<Backend>, HashMap<u64, bool>)> {
let records = self.resolver.lookup_ip(&self.domain).await.map_err(|err| {
Error::create(
ErrorType::Custom("DNS lookup error"),
ErrorSource::Internal,
Some(format!("{:?}", self).into()),
Some(err.into()),
)
})?;
let result: BTreeSet<_> = records
.iter()
.filter_map(|ip| match ip {
IpAddr::V4(ip) => Some(SocketAddr::Inet(std::net::SocketAddr::V4(
SocketAddrV4::new(ip, self.port),
))),
IpAddr::V6(_) => None,
})
.map(|addr| Backend {
addr: addr,
weight: 1,
ext: Extensions::new(),
})
.collect();
Ok((result, HashMap::new()))
}
}
impl From<DnsDiscovery> for Backends {
fn from(value: DnsDiscovery) -> Self {
Backends::new(Box::new(value))
}
}
impl<S> From<DnsDiscovery> for LoadBalancer<S>
where
S: BackendSelection + 'static,
S::Iter: BackendIter,
{
fn from(value: DnsDiscovery) -> Self {
LoadBalancer::from_backends(value.into())
}
}
use async_trait::async_trait;
use futures::future::ready;
use futures::future::{select, Either};
use futures::{Future, FutureExt, Stream, StreamExt};
use http::Extensions;
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::api::Api;
use kube::runtime::reflector::Store;
use kube::runtime::watcher::Config;
use kube::runtime::{reflector, watcher, WatchStreamExt};
use kube::Client;
use pingora::lb::discovery::ServiceDiscovery;
use pingora::lb::selection::{BackendIter, BackendSelection};
use pingora::lb::Backends;
use pingora::lb::{Backend, LoadBalancer};
use pingora::protocols::l4::socket::SocketAddr;
use pingora::server::ShutdownWatch;
use pingora::services::background::BackgroundService;
use pingora::Result as PingoraResult;
use std::collections::{BTreeSet, HashMap};
use std::fmt::Debug;
use std::net::SocketAddrV4;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
/// Service discovery that gets addresses from EndpointSlices from kubernetes cluster.
///
/// Only IPv4 addresses are used, IPv6 ignored silently.
#[derive(Debug, Clone)]
pub struct EndpointSliceDiscovery {
/// Store from kube reflector. Required fields are conditions and addresses.
pub store: Store<EndpointSlice>,
// Port used for Backend. Not checked against EndpointSlice actual ports!
pub port: u16,
/// Extensions that will be set to backends
pub extensions: Option<Extensions>,
}
impl EndpointSliceDiscovery {
/// Make discovery for kubernetes Service.
/// Also returns BackgroundService that should be registered for timely updates of EndpointSlices.
pub fn for_service(
client: Client,
namespace: impl AsRef<str>,
service: impl AsRef<str>,
port: u16,
) -> (Self, EndpointSliceDiscoveryUpdater) {
let filter = Config::default()
.labels(format!("kubernetes.io/service-name={}", service.as_ref()).as_str());
Self::from_kube_config(client, namespace, filter, port)
}
/// Make discovery from `kube::runtime::watcher::Config` for EndpointSlice selection.
pub fn from_kube_config(
client: Client,
namespace: impl AsRef<str>,
config: Config,
port: u16,
) -> (Self, EndpointSliceDiscoveryUpdater) {
let endpoint_slices = Api::<EndpointSlice>::namespaced(client, namespace.as_ref());
let (store, writer) = reflector::store();
let watcher = watcher(endpoint_slices, config);
let watcher_stream = watcher.default_backoff().reflect(writer);
let kube_discovery = EndpointSliceDiscovery {
store: store,
port: port,
extensions: None,
};
let kube_upd_service = EndpointSliceDiscoveryUpdater {
stream: Mutex::new(Some(Box::pin(
watcher_stream
.filter_map(|e| ready(Some(e.unwrap())))
.boxed(),
))),
lb: None,
};
(kube_discovery, kube_upd_service)
}
fn endpoint_slices_to_backends<'a>(
&'a self,
endpoint_slice: &'a EndpointSlice,
) -> impl Iterator<Item = Backend> + 'a {
endpoint_slice
.endpoints
.iter()
.filter(|&e| {
if let Some(conditions) = e.conditions.as_ref() {
if let Some(ready) = conditions.ready {
return ready;
}
}
false
})
.map(|endpoint| {
endpoint
.addresses
.iter()
.map(|ip| ip.parse().expect("Failed to parse endpoint IPv4 address"))
})
.flatten()
.map(|ip| SocketAddr::Inet(std::net::SocketAddr::V4(SocketAddrV4::new(ip, self.port))))
.map(|addr| Backend {
addr: addr,
weight: 1,
ext: self.extensions.clone().unwrap_or_else(Extensions::new),
})
}
}
impl From<EndpointSliceDiscovery> for Backends {
fn from(value: EndpointSliceDiscovery) -> Self {
Backends::new(Box::new(value))
}
}
impl<S> From<EndpointSliceDiscovery> for LoadBalancer<S>
where
S: BackendSelection + 'static,
S::Iter: BackendIter,
{
fn from(value: EndpointSliceDiscovery) -> Self {
LoadBalancer::from_backends(value.into())
}
}
#[async_trait]
impl ServiceDiscovery for EndpointSliceDiscovery {
async fn discover(&self) -> PingoraResult<(BTreeSet<Backend>, HashMap<u64, bool>)> {
self.store.wait_until_ready().await.unwrap();
let state = self.store.state();
let result: BTreeSet<Backend> = state
.into_iter()
.filter(|e| e.address_type == "IPv4")
.map(|e| {
let backends: Vec<Backend> = self.endpoint_slices_to_backends(&*e).collect();
backends
})
.flatten()
.collect();
Ok((result, HashMap::new()))
}
}
/// Helper trait implemented for LoadBalancer to erase generic.
pub trait LoadBalancerUpdater: Send + Sync + 'static {
fn update<'a>(&'a self) -> Pin<Box<dyn Future<Output = PingoraResult<()>> + Send + 'a>>;
}
impl<S> LoadBalancerUpdater for LoadBalancer<S>
where
S: BackendSelection + Send + Sync + 'static,
S::Iter: BackendIter,
{
fn update<'a>(&'a self) -> Pin<Box<dyn Future<Output = PingoraResult<()>> + Send + 'a>> {
self.update().boxed()
}
}
impl<S> LoadBalancerUpdater for Arc<LoadBalancer<S>>
where
S: BackendSelection + Send + Sync + 'static,
S::Iter: BackendIter,
{
fn update<'a>(&'a self) -> Pin<Box<dyn Future<Output = PingoraResult<()>> + Send + 'a>> {
(**self).update().boxed()
}
}
/// Background updater of load balancer when EndpointSlice stream is changed
///
/// User is required to call with_lb/set_lb before registering it as background services, otherwise will panic
pub struct EndpointSliceDiscoveryUpdater {
/// Event stream from kubernetes reflector
stream: Mutex<Option<Pin<Box<dyn Stream<Item = watcher::Event<EndpointSlice>> + Send>>>>,
/// Load balancer that will be updated on EndpointSlice changes. BackgroundService Will panic if left as None.
lb: Option<Box<dyn LoadBalancerUpdater>>,
}
impl EndpointSliceDiscoveryUpdater {
pub fn with_lb<T: LoadBalancerUpdater>(mut self, lb: T) -> Self {
self.set_lb(lb);
self
}
pub fn set_lb<T: LoadBalancerUpdater>(&mut self, lb: T) {
self.lb = Some(Box::new(lb));
}
}
#[async_trait]
impl BackgroundService for EndpointSliceDiscoveryUpdater {
async fn start(&self, mut _shutdown: ShutdownWatch) {
if self.lb.is_none() {
panic!("Need to set `lb` in EndpointSliceDiscoveryUpdater")
}
let owned_stream = self
.stream
.lock()
.unwrap()
.take()
.expect("No stream in EndpointSliceDiscoveryUpdater");
let stream = owned_stream
.for_each(|event| async move {
match event {
watcher::Event::InitDone
| watcher::Event::Apply(_)
| watcher::Event::Delete(_) => {
self.lb
.as_ref()
.unwrap()
.update()
.await
.expect("Failed to update backends");
}
_ => {}
}
()
})
.boxed();
let completed = select(stream, _shutdown.changed().boxed()).await;
match completed {
Either::Left(_) => {
panic!("Kubernetes update stream ended abruptly");
}
Either::Right(_) => {
// terminating
}
}
}
}
let (kube_discovery, kube_updater) = EndpointSliceDiscovery::for_service(kube_client, "default", kube_svc, kube_port);
// make your Backends and load balancer here
// ...
// let lb: Arc<LoadBalancer<Weighted<RoundRobin>>> = ...;
// provide backgroud updater with load balancer to update when kube service is changed
kube_updater.set_lb(lb.clone());
// make background service for pingora
let updater_background_service = background_service(&format!("Kube service updater {} {}", kube_svc, kube_port), kube_updater);
// register it to your server
server.add_service(updater_background_service);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment