use anyhow::{anyhow, Result};
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::{PersistentVolumeClaim, Pod};
use kube::api::{Api, DeleteParams, ListParams, Patch, PatchParams};
use kube::Client;
use kube::CustomResource;
use kube_runtime::utils::try_flatten_applied;
use kube_runtime::watcher;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Debug, Serialize, Deserialize, Default, Clone, JsonSchema)]
group = "",
version = "v1beta1",
kind = "CassandraDatacenter",
status = "CassandraDatacenterStatus"
pub struct CassandraDatacenterSpec {
replace_nodes: Vec<String>,
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema)]
pub struct CassandraDatacenterStatus {
node_replacements: Vec<String>,
conditions: Vec<CassandraDatacenterCondition>,
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema)]
pub struct CassandraDatacenterCondition {
pub last_transition_time: Option<String>,
pub message: Option<String>,
pub reason: Option<String>,
pub status: String,
pub type_: String,
impl CassandraDatacenterCondition {
fn is_ready(self) -> bool {
self.type_ == "Ready" && self.status == "True"
async fn main() -> Result<()> {
let client = Client::try_default().await?;
let namespace = "default";
let cluster_name = "cluster1";
let datacenter_name = "dc1";
let pod_name = format!(
cluster_name = cluster_name,
datacenter_name = datacenter_name,
namespace = namespace
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
let pod_to_replace = pods.get(&pod_name).await?;
let volume = pod_to_replace.spec.and_then(|spec| {
.find(|volume| == "server-data")
// TODO no unwraps plz
let persistent_volume_claim = volume.unwrap().persistent_volume_claim.unwrap().claim_name;
let datacenters: Api<CassandraDatacenter> = Api::namespaced(client.clone(), namespace);
// Notify cassandra that we are replacing
let replace_nodes = CassandraDatacenterSpec {
replace_nodes: vec![pod_name.to_string()],
&Patch::Apply(CassandraDatacenter::new(datacenter_name, replace_nodes)),
let persistent_volume_claims: Api<PersistentVolumeClaim> =
Api::namespaced(client.clone(), namespace);
// Delete the PersistentVolumeClaim of this cassandra node, so that it will try to allocate a
// fresh disk
.delete(&persistent_volume_claim, &DeleteParams::default())
// Delete the defect cassandra node, so that a new one will be scheduled in the cluster
pods.delete(&pod_name, &DeleteParams::default()).await?;
let list_params = ListParams::default()
.fields(&format!("{}", datacenter_name))
// Wait for the datacenter to be ready again. This means node replacement succeeded
let ready = try_flatten_applied(watcher(datacenters, list_params))
.try_fold(false, |ready, event| async move {
Ok(event.status.map_or(ready, |status| {
if !ready {
return Err(anyhow!("Timed out waiting for condition"));
