Skip to content

Instantly share code, notes, and snippets.

@xrl
Created July 30, 2021 20:58
Show Gist options
  • Save xrl/3c5727e30e78ae300539fd93defc031b to your computer and use it in GitHub Desktop.
Save xrl/3c5727e30e78ae300539fd93defc031b to your computer and use it in GitHub Desktop.
Using rust kube-rs to check, poll, and renew advisory locks in Kubernetes
use k8s_openapi::api::coordination::v1::{Lease as KubeLease, LeaseSpec as KubeLeaseSpec};
use chrono::{Local, Utc};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::MicroTime;
use std::time::Duration;
use kube::api::{PatchParams, PostParams, ObjectMeta};
use kube::Api;
use tokio::task::JoinHandle;
use tokio::sync::oneshot::Sender;
const LEASE_DURATION_SECONDS: u64 = 5;
pub struct Lease {
join_handle: JoinHandle<()>,
sender: Sender<()>
}
impl Lease {
pub fn lease_name() -> &'static str {
"cousteau"
}
pub fn lease_duration() -> u64 {
LEASE_DURATION_SECONDS
}
pub async fn available(kube_api_client: crate::Client, ns: &str, lease_name: &str) -> Result<bool, String> {
let lease_client: Api<KubeLease> = kube::Api::namespaced(kube_api_client.kube_api_client.clone(), ns);
let get_lease = lease_client.get(lease_name).await;
match get_lease {
Err(kube::Error::Api(kube::error::ErrorResponse{ code: 404, ..})) => Ok(true),
Err(err) => Err(format!("{:#?}", err)),
Ok(lease) => Ok(Self::lease_expired(&lease))
}
}
pub async fn acquire_or_create(kube_api_client: crate::Client, ns: &str, lease_name: &str, identity: &str) -> Result<Lease, ()>{
// let lease_client = self.kube_api_client
let lease_client: Api<KubeLease> = kube::Api::namespaced(kube_api_client.kube_api_client.clone(), ns);
// check for lease
let lease = loop {
let get_lease = lease_client.get(lease_name).await;
if let Err(kube::Error::Api(kube::error::ErrorResponse { code: 404, .. })) = get_lease {
trace!("lease does not existing, instantiating with defaults");
let lease = lease_client.create(&PostParams::default(), &KubeLease {
metadata: ObjectMeta { namespace: Some(ns.to_string()), name: Some(lease_name.to_string()), ..Default::default() },
spec: Some(KubeLeaseSpec {
acquire_time: Some(Self::now()),
lease_duration_seconds: Some(LEASE_DURATION_SECONDS as i32),
holder_identity: Some(identity.to_string()),
lease_transitions: Some(1),
..Default::default()
})
}).await.unwrap();
break lease;
} else if let Ok(mut lease) = get_lease {
if Self::lease_expired(&lease) {
trace!("the lease expired, taking ownership");
lease.metadata.managed_fields = None;
let spec = lease.spec.as_mut().unwrap();
if spec.lease_transitions.is_none() {
spec.lease_transitions = Some(0);
}
spec.lease_transitions.as_mut().map(|lt| *lt = *lt + 1);
spec.acquire_time = Some(Self::now());
spec.renew_time = None;
spec.lease_duration_seconds = Some(LEASE_DURATION_SECONDS as i32);
spec.holder_identity = Some(identity.to_string());
lease = lease_client.patch(lease_name,
&PatchParams::apply("cousteau").force(),
serde_json::to_vec(&lease).unwrap()).await.unwrap();
break lease;
} else {
let wait_time = match lease.spec {
Some(KubeLeaseSpec{lease_duration_seconds: Some(lds), ..}) => lds as u64,
_ => LEASE_DURATION_SECONDS
};
trace!("lease is not ready, let's wait {} seconds and try again", wait_time);
tokio::time::delay_for(Duration::from_secs(wait_time)).await;
continue;
}
} else {
panic!("what in the {:#?}", get_lease);
};
};
let (sender,mut recv) = tokio::sync::oneshot::channel();
let renew_client = lease_client.clone();
let mut renew_resource_version = lease.metadata.resource_version.clone();
let renew_object_name = lease_name.to_string();
let renew_lease_duration_seconds = lease.spec.as_ref().unwrap().lease_duration_seconds.unwrap();
let join_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(renew_lease_duration_seconds as u64));
loop {
tokio::select! {
_ = interval.tick() => {
trace!("interval tick fired, good time to renew stuff");
let patch_params = PatchParams::apply("cousteau");
let patch = serde_yaml::to_vec(&serde_json::json!({
"apiVersion": "coordination.k8s.io/v1",
"kind": "Lease",
"metadata": {
"resourceVersion": renew_resource_version,
"name": renew_object_name
},
"spec": {
"renewTime": Self::now(),
}
})).unwrap();
let patch_res = renew_client.patch(&renew_object_name, &patch_params, patch).await.unwrap();
renew_resource_version = patch_res.metadata.resource_version;
}
_ = &mut recv => {
trace!("receiver woke up");
break
}
}
}
trace!("all done looping, zeroing out the lease");
let patch_params = PatchParams::apply("cousteau");
let patch = serde_yaml::to_vec(&serde_json::json!({
"apiVersion": "coordination.k8s.io/v1",
"kind": "Lease",
"metadata": {
"resourceVersion": renew_resource_version,
"name": renew_object_name
},
"spec": {
"renewTime": Option::<()>::None,
"acquireTime": Option::<()>::None,
"holderIdentity": Option::<()>::None
}
})).unwrap();
renew_client.patch(&renew_object_name, &patch_params, patch).await.unwrap();
trace!("all done with the lease");
});
return Ok(Lease {
join_handle,
sender
})
}
fn now() -> MicroTime {
let local_now = Local::now();
MicroTime(local_now.with_timezone(&Utc))
}
fn lease_expired(lease: &KubeLease) -> bool {
let KubeLeaseSpec{acquire_time, renew_time, lease_duration_seconds, ..} = lease.spec.as_ref().unwrap();
let local_now = Local::now();
let utc_now = local_now.with_timezone(&Utc);
let lease_duration = chrono::Duration::seconds(*lease_duration_seconds.as_ref().unwrap() as i64);
if let Some(MicroTime(time)) = renew_time {
let renew_expire = time.checked_add_signed(lease_duration).unwrap();
return utc_now.gt(&renew_expire)
} else if let Some(MicroTime(time)) = acquire_time {
let acquire_expire = time.checked_add_signed(lease_duration).unwrap();
return utc_now.gt(&acquire_expire)
}
return true;
}
pub async fn join(self) -> Result<(),tokio::task::JoinError> {
self.sender.send(()).unwrap();
self.join_handle.await
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::Client;
#[tokio::test]
async fn test_lease() {
let client = Client::new().await;
let lease = Lease::acquire_or_create(client, "staging", "cousteau","test-case").await.unwrap();
tokio::time::delay_for(Duration::from_secs(5*LEASE_DURATION_SECONDS as u64)).await;
lease.join().await.unwrap();
}
#[tokio::test]
async fn test_availability() {
let client = Client::new().await;
let lease = Lease::acquire_or_create(client.clone(), "production", "cousteau","test-case").await.unwrap();
let available = Lease::available(client, "production", "cousteau").await.unwrap();
panic!("available: {:#?}", available);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment