This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use anyhow::{anyhow, Result}; | |
use futures::TryStreamExt; | |
use k8s_openapi::api::core::v1::{PersistentVolumeClaim, Pod}; | |
use kube::api::{Api, DeleteParams, ListParams, Patch, PatchParams}; | |
use kube::Client; | |
use kube::CustomResource; | |
use kube_runtime::utils::try_flatten_applied; | |
use kube_runtime::watcher; | |
use schemars::JsonSchema; | |
use serde::{Deserialize, Serialize}; | |
#[derive(CustomResource, Debug, Serialize, Deserialize, Default, Clone, JsonSchema)] | |
#[kube( | |
group = "cassandradatacenters.cassandra.datastax.com", | |
version = "v1beta1", | |
kind = "CassandraDatacenter", | |
namespaced, | |
status = "CassandraDatacenterStatus" | |
)] | |
pub struct CassandraDatacenterSpec { | |
replace_nodes: Vec<String>, | |
} | |
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema)] | |
pub struct CassandraDatacenterStatus { | |
node_replacements: Vec<String>, | |
conditions: Vec<CassandraDatacenterCondition>, | |
} | |
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema)] | |
pub struct CassandraDatacenterCondition { | |
pub last_transition_time: Option<String>, | |
pub message: Option<String>, | |
pub reason: Option<String>, | |
pub status: String, | |
pub type_: String, | |
} | |
impl CassandraDatacenterCondition { | |
fn is_ready(self) -> bool { | |
self.type_ == "Ready" && self.status == "True" | |
} | |
} | |
#[tokio::main] | |
async fn main() -> Result<()> { | |
let client = Client::try_default().await?; | |
let namespace = "default"; | |
let cluster_name = "cluster1"; | |
let datacenter_name = "dc1"; | |
let pod_name = format!( | |
"{cluster_name}-{datacenter_name}-{namespace}-sts-0", | |
cluster_name = cluster_name, | |
datacenter_name = datacenter_name, | |
namespace = namespace | |
); | |
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace); | |
let pod_to_replace = pods.get(&pod_name).await?; | |
let volume = pod_to_replace.spec.and_then(|spec| { | |
spec.volumes | |
.into_iter() | |
.find(|volume| volume.name == "server-data") | |
}); | |
// TODO no unwraps plz | |
let persistent_volume_claim = volume.unwrap().persistent_volume_claim.unwrap().claim_name; | |
let datacenters: Api<CassandraDatacenter> = Api::namespaced(client.clone(), namespace); | |
// Notify cassandra that we are replacing | |
let replace_nodes = CassandraDatacenterSpec { | |
replace_nodes: vec![pod_name.to_string()], | |
}; | |
datacenters | |
.patch( | |
datacenter_name, | |
&PatchParams::default(), | |
&Patch::Apply(CassandraDatacenter::new(datacenter_name, replace_nodes)), | |
) | |
.await?; | |
let persistent_volume_claims: Api<PersistentVolumeClaim> = | |
Api::namespaced(client.clone(), namespace); | |
// Delete the PersistentVolumeClaim of this cassandra node, so that it will try to allocate a | |
// fresh disk | |
persistent_volume_claims | |
.delete(&persistent_volume_claim, &DeleteParams::default()) | |
.await?; | |
// Delete the defect cassandra node, so that a new one will be scheduled in the cluster | |
pods.delete(&pod_name, &DeleteParams::default()).await?; | |
let list_params = ListParams::default() | |
.fields(&format!("metadata.name={}", datacenter_name)) | |
.timeout(20); | |
// Wait for the datacenter to be ready again. This means node replacement succeeded | |
let ready = try_flatten_applied(watcher(datacenters, list_params)) | |
.try_fold(false, |ready, event| async move { | |
Ok(event.status.map_or(ready, |status| { | |
status | |
.conditions | |
.into_iter() | |
.any(CassandraDatacenterCondition::is_ready) | |
})) | |
}) | |
.await?; | |
if !ready { | |
return Err(anyhow!("Timed out waiting for condition")); | |
} | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment