Skip to content

Instantly share code, notes, and snippets.

@reneluria
Last active January 6, 2022 15:34
Show Gist options
  • Save reneluria/4210ce37f812c73915edca7916df7a1c to your computer and use it in GitHub Desktop.
Save reneluria/4210ce37f812c73915edca7916df7a1c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
safe-drain - drains a node safely by issuing rollout restart on single pod deployments
Copyright (C) 2022 rene.luria@infomaniak.com
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import datetime
from kubernetes import client, config, watch
import typer
from typing import Dict
app = typer.Typer()
config.load_kube_config()
v1 = client.CoreV1Api()
apps = client.AppsV1Api()
def match_labels_to_selector(match_labels: Dict) -> str:
filters = []
for label in match_labels:
filters.append("{}={}".format(label, match_labels[label]))
return ",".join(filters)
def node_callback(value: str):
if value not in [x.metadata.name for x in v1.list_node().items]:
raise typer.BadParameter("Node {} does not exist".format(value))
return value
def when() -> str:
"""
Returns actual timestamp like this:
'2021-11-02T13:31:00+00:00'
"""
return datetime.datetime.now(tz=datetime.timezone.utc).isoformat(timespec="seconds")
def wait_for_deploy(deployment: str, namespace: str, generation) -> None:
w = watch.Watch()
for event in w.stream(
apps.list_namespaced_deployment, timeout_seconds=300, namespace=namespace
):
if event["object"].metadata.name == deployment and event["type"] == "MODIFIED":
for condition in event["object"].status.conditions:
if condition.type == "Progressing":
typer.echo(condition.message)
if (
event["object"].status.observed_generation != generation
and event["object"].status.updated_replicas == 1
and event["object"].status.replicas == 1
):
w.stop()
break
def rollout_restart(name: str, namespace: str) -> None:
"""
Patch deployment template annotations to start rollout
"""
deployment = apps.read_namespaced_deployment(name, namespace)
apps.patch_namespaced_deployment(
name,
namespace,
{
"spec": {
"template": {
"metadata": {
"annotations": {"kubectl.kubernetes.io/restartedAt": when()}
}
}
}
},
)
wait_for_deploy(name, namespace, deployment.metadata.generation)
def evict_pod(namespace: str, pod_name: str) -> None:
podlist_version = v1.list_namespaced_pod("default").metadata.resource_version
eviction = client.V1beta1Eviction(metadata=client.V1ObjectMeta(name=pod_name, namespace=namespace))
v1.create_namespaced_pod_eviction(name=pod_name, namespace=namespace, body=eviction)
w = watch.Watch()
for event in w.stream(v1.list_namespaced_pod, timeout_seconds=300, namespace=namespace, resource_version=podlist_version):
if event["object"].metadata.name == pod_name:
if event["type"] == "DELETED":
w.stop()
break
@app.command()
def drain(
node: str = typer.Argument(..., callback=node_callback),
dry_run: bool = typer.Option(False, help="Dry run, don't actualy rollout or else"),
):
"""
Drain a node by cordoning and rollout restart any single pod deployment
existing on this node
"""
field_selector = "spec.nodeName={}".format(node)
# cordon node
if not dry_run:
v1.patch_node(node, {"spec": client.V1NodeSpec(unschedulable=True)})
for deploy in [
x
for x in apps.list_deployment_for_all_namespaces().items
if x.spec.replicas == 1 and x.status.available_replicas == 1
]:
label_selector = match_labels_to_selector(deploy.spec.selector.match_labels)
pods = v1.list_namespaced_pod(
deploy.metadata.namespace,
label_selector=label_selector,
field_selector=field_selector,
).items
# only rollout if workload is active
if len(pods) > 0:
if dry_run:
typer.echo(
"Would restart {}/{}".format(
deploy.metadata.namespace, deploy.metadata.name
)
)
else:
typer.echo(
"{}/{}".format(deploy.metadata.namespace, deploy.metadata.name)
)
rollout_restart(deploy.metadata.name, deploy.metadata.namespace)
for deploy in [x for x in apps.list_deployment_for_all_namespaces().items]:
label_selector = match_labels_to_selector(deploy.spec.selector.match_labels)
pods = v1.list_namespaced_pod(
deploy.metadata.namespace,
label_selector=label_selector,
field_selector=field_selector,
).items
# only rollout if workload is active
if len(pods) > 0:
for pod in pods:
if dry_run:
typer.echo(
"Would evict {}/{}/{}".format(
deploy.metadata.namespace, deploy.metadata.name, pod.metadata.name
)
)
else:
typer.echo(
"Evicting {}/{}/{}".format(
deploy.metadata.namespace, deploy.metadata.name, pod.metadata.name
)
)
evict_pod(pod.metadata.namespace, pod.metadata.name)
if __name__ == "__main__":
app()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment