Last active
October 28, 2024 16:23
-
-
Save Object905/6cafd5e8e56dd60670149296411a407f to your computer and use it in GitHub Desktop.
Pingora kubernetes/DNS ServiceDiscovery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use async_trait::async_trait; | |
use hickory_resolver::TokioAsyncResolver; | |
use http::Extensions; | |
use pingora::lb::discovery::ServiceDiscovery; | |
use pingora::lb::selection::{BackendIter, BackendSelection}; | |
use pingora::lb::{Backend, Backends, LoadBalancer}; | |
use pingora::protocols::l4::socket::SocketAddr; | |
use pingora::{Error, ErrorSource, ErrorType, Result as PingoraResult}; | |
use std::fmt::Debug; | |
use std::net::{IpAddr, SocketAddrV4}; | |
use std::{ | |
collections::{BTreeSet, HashMap}, | |
sync::Arc, | |
}; | |
/// Service discovery that resolves domains to Backends with DNS lookup using `hickory_resolver` crate. | |
/// | |
/// Only IPv4 addresses are used, IPv6 ignored silently. | |
#[derive(Debug, Clone)] | |
pub struct DnsDiscovery { | |
/// Domain that will be resolved | |
pub domain: String, | |
// Port used for Backend | |
pub port: u16, | |
/// Resolver from `hickory_resolver` | |
pub resolver: Arc<TokioAsyncResolver>, | |
/// Extensions that will be set to backends | |
pub extensions: Option<Extensions>, | |
} | |
impl DnsDiscovery { | |
pub fn new<D: Into<String>>(domain: D, port: u16, resolver: Arc<TokioAsyncResolver>) -> Self { | |
DnsDiscovery { | |
domain: domain.into(), | |
port, | |
resolver, | |
extensions: None, | |
} | |
} | |
pub fn with_extensions(mut self, extensions: Extensions) -> Self { | |
self.extensions = Some(extensions); | |
self | |
} | |
} | |
#[async_trait] | |
impl ServiceDiscovery for DnsDiscovery { | |
async fn discover(&self) -> PingoraResult<(BTreeSet<Backend>, HashMap<u64, bool>)> { | |
let records = self.resolver.lookup_ip(&self.domain).await.map_err(|err| { | |
Error::create( | |
ErrorType::Custom("DNS lookup error"), | |
ErrorSource::Internal, | |
Some(format!("{:?}", self).into()), | |
Some(err.into()), | |
) | |
})?; | |
let result: BTreeSet<_> = records | |
.iter() | |
.filter_map(|ip| match ip { | |
IpAddr::V4(ip) => Some(SocketAddr::Inet(std::net::SocketAddr::V4( | |
SocketAddrV4::new(ip, self.port), | |
))), | |
IpAddr::V6(_) => None, | |
}) | |
.map(|addr| Backend { | |
addr: addr, | |
weight: 1, | |
ext: Extensions::new(), | |
}) | |
.collect(); | |
Ok((result, HashMap::new())) | |
} | |
} | |
impl From<DnsDiscovery> for Backends { | |
fn from(value: DnsDiscovery) -> Self { | |
Backends::new(Box::new(value)) | |
} | |
} | |
impl<S> From<DnsDiscovery> for LoadBalancer<S> | |
where | |
S: BackendSelection + 'static, | |
S::Iter: BackendIter, | |
{ | |
fn from(value: DnsDiscovery) -> Self { | |
LoadBalancer::from_backends(value.into()) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use async_trait::async_trait; | |
use futures::future::ready; | |
use futures::future::{select, Either}; | |
use futures::{Future, FutureExt, Stream, StreamExt}; | |
use http::Extensions; | |
use k8s_openapi::api::discovery::v1::EndpointSlice; | |
use kube::api::Api; | |
use kube::runtime::reflector::Store; | |
use kube::runtime::watcher::Config; | |
use kube::runtime::{reflector, watcher, WatchStreamExt}; | |
use kube::Client; | |
use pingora::lb::discovery::ServiceDiscovery; | |
use pingora::lb::selection::{BackendIter, BackendSelection}; | |
use pingora::lb::Backends; | |
use pingora::lb::{Backend, LoadBalancer}; | |
use pingora::protocols::l4::socket::SocketAddr; | |
use pingora::server::ShutdownWatch; | |
use pingora::services::background::BackgroundService; | |
use pingora::Result as PingoraResult; | |
use std::collections::{BTreeSet, HashMap}; | |
use std::fmt::Debug; | |
use std::net::SocketAddrV4; | |
use std::pin::Pin; | |
use std::sync::Arc; | |
use std::sync::Mutex; | |
/// Service discovery that gets addresses from EndpointSlices from kubernetes cluster. | |
/// | |
/// Only IPv4 addresses are used, IPv6 ignored silently. | |
#[derive(Debug, Clone)] | |
pub struct EndpointSliceDiscovery { | |
/// Store from kube reflector. Required fields are conditions and addresses. | |
pub store: Store<EndpointSlice>, | |
// Port used for Backend. Not checked against EndpointSlice actual ports! | |
pub port: u16, | |
/// Extensions that will be set to backends | |
pub extensions: Option<Extensions>, | |
} | |
impl EndpointSliceDiscovery { | |
/// Make discovery for kubernetes Service. | |
/// Also returns BackgroundService that should be registered for timely updates of EndpointSlices. | |
pub fn for_service( | |
client: Client, | |
namespace: impl AsRef<str>, | |
service: impl AsRef<str>, | |
port: u16, | |
) -> (Self, EndpointSliceDiscoveryUpdater) { | |
let filter = Config::default() | |
.labels(format!("kubernetes.io/service-name={}", service.as_ref()).as_str()); | |
Self::from_kube_config(client, namespace, filter, port) | |
} | |
/// Make discovery from `kube::runtime::watcher::Config` for EndpointSlice selection. | |
pub fn from_kube_config( | |
client: Client, | |
namespace: impl AsRef<str>, | |
config: Config, | |
port: u16, | |
) -> (Self, EndpointSliceDiscoveryUpdater) { | |
let endpoint_slices = Api::<EndpointSlice>::namespaced(client, namespace.as_ref()); | |
let (store, writer) = reflector::store(); | |
let watcher = watcher(endpoint_slices, config); | |
let watcher_stream = watcher.default_backoff().reflect(writer); | |
let kube_discovery = EndpointSliceDiscovery { | |
store: store, | |
port: port, | |
extensions: None, | |
}; | |
let kube_upd_service = EndpointSliceDiscoveryUpdater { | |
stream: Mutex::new(Some(Box::pin( | |
watcher_stream | |
.filter_map(|e| ready(Some(e.unwrap()))) | |
.boxed(), | |
))), | |
lb: None, | |
}; | |
(kube_discovery, kube_upd_service) | |
} | |
fn endpoint_slices_to_backends<'a>( | |
&'a self, | |
endpoint_slice: &'a EndpointSlice, | |
) -> impl Iterator<Item = Backend> + 'a { | |
endpoint_slice | |
.endpoints | |
.iter() | |
.filter(|&e| { | |
if let Some(conditions) = e.conditions.as_ref() { | |
if let Some(ready) = conditions.ready { | |
return ready; | |
} | |
} | |
false | |
}) | |
.map(|endpoint| { | |
endpoint | |
.addresses | |
.iter() | |
.map(|ip| ip.parse().expect("Failed to parse endpoint IPv4 address")) | |
}) | |
.flatten() | |
.map(|ip| SocketAddr::Inet(std::net::SocketAddr::V4(SocketAddrV4::new(ip, self.port)))) | |
.map(|addr| Backend { | |
addr: addr, | |
weight: 1, | |
ext: self.extensions.clone().unwrap_or_else(Extensions::new), | |
}) | |
} | |
} | |
impl From<EndpointSliceDiscovery> for Backends { | |
fn from(value: EndpointSliceDiscovery) -> Self { | |
Backends::new(Box::new(value)) | |
} | |
} | |
impl<S> From<EndpointSliceDiscovery> for LoadBalancer<S> | |
where | |
S: BackendSelection + 'static, | |
S::Iter: BackendIter, | |
{ | |
fn from(value: EndpointSliceDiscovery) -> Self { | |
LoadBalancer::from_backends(value.into()) | |
} | |
} | |
#[async_trait] | |
impl ServiceDiscovery for EndpointSliceDiscovery { | |
async fn discover(&self) -> PingoraResult<(BTreeSet<Backend>, HashMap<u64, bool>)> { | |
self.store.wait_until_ready().await.unwrap(); | |
let state = self.store.state(); | |
let result: BTreeSet<Backend> = state | |
.into_iter() | |
.filter(|e| e.address_type == "IPv4") | |
.map(|e| { | |
let backends: Vec<Backend> = self.endpoint_slices_to_backends(&*e).collect(); | |
backends | |
}) | |
.flatten() | |
.collect(); | |
Ok((result, HashMap::new())) | |
} | |
} | |
/// Helper trait implemented for LoadBalancer to erase generic. | |
pub trait LoadBalancerUpdater: Send + Sync + 'static { | |
fn update<'a>(&'a self) -> Pin<Box<dyn Future<Output = PingoraResult<()>> + Send + 'a>>; | |
} | |
impl<S> LoadBalancerUpdater for LoadBalancer<S> | |
where | |
S: BackendSelection + Send + Sync + 'static, | |
S::Iter: BackendIter, | |
{ | |
fn update<'a>(&'a self) -> Pin<Box<dyn Future<Output = PingoraResult<()>> + Send + 'a>> { | |
self.update().boxed() | |
} | |
} | |
impl<S> LoadBalancerUpdater for Arc<LoadBalancer<S>> | |
where | |
S: BackendSelection + Send + Sync + 'static, | |
S::Iter: BackendIter, | |
{ | |
fn update<'a>(&'a self) -> Pin<Box<dyn Future<Output = PingoraResult<()>> + Send + 'a>> { | |
(**self).update().boxed() | |
} | |
} | |
/// Background updater of load balancer when EndpointSlice stream is changed | |
/// | |
/// User is required to call with_lb/set_lb before registering it as background services, otherwise will panic | |
pub struct EndpointSliceDiscoveryUpdater { | |
/// Event stream from kubernetes reflector | |
stream: Mutex<Option<Pin<Box<dyn Stream<Item = watcher::Event<EndpointSlice>> + Send>>>>, | |
/// Load balancer that will be updated on EndpointSlice changes. BackgroundService Will panic if left as None. | |
lb: Option<Box<dyn LoadBalancerUpdater>>, | |
} | |
impl EndpointSliceDiscoveryUpdater { | |
pub fn with_lb<T: LoadBalancerUpdater>(mut self, lb: T) -> Self { | |
self.set_lb(lb); | |
self | |
} | |
pub fn set_lb<T: LoadBalancerUpdater>(&mut self, lb: T) { | |
self.lb = Some(Box::new(lb)); | |
} | |
} | |
#[async_trait] | |
impl BackgroundService for EndpointSliceDiscoveryUpdater { | |
async fn start(&self, mut _shutdown: ShutdownWatch) { | |
if self.lb.is_none() { | |
panic!("Need to set `lb` in EndpointSliceDiscoveryUpdater") | |
} | |
let owned_stream = self | |
.stream | |
.lock() | |
.unwrap() | |
.take() | |
.expect("No stream in EndpointSliceDiscoveryUpdater"); | |
let stream = owned_stream | |
.for_each(|event| async move { | |
match event { | |
watcher::Event::InitDone | |
| watcher::Event::Apply(_) | |
| watcher::Event::Delete(_) => { | |
self.lb | |
.as_ref() | |
.unwrap() | |
.update() | |
.await | |
.expect("Failed to update backends"); | |
} | |
_ => {} | |
} | |
() | |
}) | |
.boxed(); | |
let completed = select(stream, _shutdown.changed().boxed()).await; | |
match completed { | |
Either::Left(_) => { | |
panic!("Kubernetes update stream ended abruptly"); | |
} | |
Either::Right(_) => { | |
// terminating | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
let (kube_discovery, kube_updater) = EndpointSliceDiscovery::for_service(kube_client, "default", kube_svc, kube_port); | |
// make your Backends and load balancer here | |
// ... | |
// let lb: Arc<LoadBalancer<Weighted<RoundRobin>>> = ...; | |
// provide backgroud updater with load balancer to update when kube service is changed | |
kube_updater.set_lb(lb.clone()); | |
// make background service for pingora | |
let updater_background_service = background_service(&format!("Kube service updater {} {}", kube_svc, kube_port), kube_updater); | |
// register it to your server | |
server.add_service(updater_background_service); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment