Skip to content

Instantly share code, notes, and snippets.

@arianvp
Last active August 16, 2021 15:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arianvp/6e03d1424d50feea124a9d10ad148365 to your computer and use it in GitHub Desktop.
Save arianvp/6e03d1424d50feea124a9d10ad148365 to your computer and use it in GitHub Desktop.
use anyhow::{anyhow, Result};
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::{PersistentVolumeClaim, Pod};
use kube::api::{Api, DeleteParams, ListParams, Patch, PatchParams};
use kube::Client;
use kube::CustomResource;
use kube_runtime::utils::try_flatten_applied;
use kube_runtime::watcher;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Debug, Serialize, Deserialize, Default, Clone, JsonSchema)]
#[kube(
group = "cassandradatacenters.cassandra.datastax.com",
version = "v1beta1",
kind = "CassandraDatacenter",
namespaced,
status = "CassandraDatacenterStatus"
)]
pub struct CassandraDatacenterSpec {
replace_nodes: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema)]
pub struct CassandraDatacenterStatus {
node_replacements: Vec<String>,
conditions: Vec<CassandraDatacenterCondition>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema)]
pub struct CassandraDatacenterCondition {
pub last_transition_time: Option<String>,
pub message: Option<String>,
pub reason: Option<String>,
pub status: String,
pub type_: String,
}
impl CassandraDatacenterCondition {
fn is_ready(self) -> bool {
self.type_ == "Ready" && self.status == "True"
}
}
#[tokio::main]
async fn main() -> Result<()> {
let client = Client::try_default().await?;
let namespace = "default";
let cluster_name = "cluster1";
let datacenter_name = "dc1";
let pod_name = format!(
"{cluster_name}-{datacenter_name}-{namespace}-sts-0",
cluster_name = cluster_name,
datacenter_name = datacenter_name,
namespace = namespace
);
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
let pod_to_replace = pods.get(&pod_name).await?;
let volume = pod_to_replace.spec.and_then(|spec| {
spec.volumes
.into_iter()
.find(|volume| volume.name == "server-data")
});
// TODO no unwraps plz
let persistent_volume_claim = volume.unwrap().persistent_volume_claim.unwrap().claim_name;
let datacenters: Api<CassandraDatacenter> = Api::namespaced(client.clone(), namespace);
// Notify cassandra that we are replacing
let replace_nodes = CassandraDatacenterSpec {
replace_nodes: vec![pod_name.to_string()],
};
datacenters
.patch(
datacenter_name,
&PatchParams::default(),
&Patch::Apply(CassandraDatacenter::new(datacenter_name, replace_nodes)),
)
.await?;
let persistent_volume_claims: Api<PersistentVolumeClaim> =
Api::namespaced(client.clone(), namespace);
// Delete the PersistentVolumeClaim of this cassandra node, so that it will try to allocate a
// fresh disk
persistent_volume_claims
.delete(&persistent_volume_claim, &DeleteParams::default())
.await?;
// Delete the defect cassandra node, so that a new one will be scheduled in the cluster
pods.delete(&pod_name, &DeleteParams::default()).await?;
let list_params = ListParams::default()
.fields(&format!("metadata.name={}", datacenter_name))
.timeout(20);
// Wait for the datacenter to be ready again. This means node replacement succeeded
let ready = try_flatten_applied(watcher(datacenters, list_params))
.try_fold(false, |ready, event| async move {
Ok(event.status.map_or(ready, |status| {
status
.conditions
.into_iter()
.any(CassandraDatacenterCondition::is_ready)
}))
})
.await?;
if !ready {
return Err(anyhow!("Timed out waiting for condition"));
}
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment