Skip to content

Instantly share code, notes, and snippets.

@inderpartap
Created March 4, 2025 20:33
Show Gist options
  • Save inderpartap/b54aa0ea4af58100dd01231bc15add57 to your computer and use it in GitHub Desktop.
Save inderpartap/b54aa0ea4af58100dd01231bc15add57 to your computer and use it in GitHub Desktop.
This Python script automates the zero-downtime migration of workloads between Amazon EKS node groups. It taints and cordons old nodes to prevent new pod scheduling, drains workloads while respecting eviction policies, and moves those workloads to the new node groups. This ensures a smooth transition without disrupting running services.
# Standard Library
import argparse
import json
import logging
import subprocess
import time
from typing import List
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
def validate_aws_credentials():
try:
run_command("aws sts get-caller-identity")
logging.info("AWS credentials are valid.")
except subprocess.CalledProcessError:
logging.error("Invalid AWS credentials. Exiting...")
exit(1)
def check_cluster_exists(cluster_name: str) -> bool:
try:
run_command(
f"aws eks describe-cluster --name {cluster_name} --region us-west-2"
)
logging.info(f"Cluster {cluster_name} exists.")
return True
except subprocess.CalledProcessError:
logging.error(f"Cluster {cluster_name} does not exist.")
return False
def run_command(command: str) -> str:
try:
result = subprocess.run(
command, shell=True, capture_output=True, text=True, check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
logging.error(f"Command '{command}' failed with error: {e.stderr}")
raise
def get_current_node_group_size(node_group: str) -> int:
current_instances = run_command(
f"aws autoscaling describe-auto-scaling-groups --auto-scaling-group-names {node_group} --query 'AutoScalingGroups[0].DesiredCapacity' --region us-west-2"
)
return int(current_instances)
def get_max_node_group_size(node_group: str) -> int:
max_instances = run_command(
f"aws autoscaling describe-auto-scaling-groups --auto-scaling-group-names {node_group} --query 'AutoScalingGroups[0].MaxSize' --region us-west-2"
)
return int(max_instances)
def check_node_group_exists(node_group: str) -> bool:
try:
run_command(
f"aws eks describe-nodegroup --cluster-name {cluster_name} --nodegroup-name {node_group} --region us-west-2"
)
return True
except subprocess.CalledProcessError:
return False
def update_node_group_size(node_group: str, desired_size: int):
logging.info(
f"Updating node group {node_group} to desired size {desired_size}"
)
run_command(
f"aws autoscaling update-auto-scaling-group --auto-scaling-group-name {node_group} --desired-capacity {desired_size} --region us-west-2"
)
while True:
current_instances = run_command(
f"aws autoscaling describe-auto-scaling-groups --auto-scaling-group-names {node_group} --region us-west-2"
)
current_instances = json.loads(current_instances)["AutoScalingGroups"][
0
]["Instances"]
running_instances = sum(
1
for instance in current_instances
if instance["LifecycleState"] == "InService"
)
if running_instances == desired_size:
break
time.sleep(60) # Wait for Instances to be connected to the cluster
def get_nodes_in_group(node_group: str) -> List[str]:
command = f"kubectl get nodes -l eks.amazonaws.com/nodegroup={node_group} -o name | cut -d'/' -f2"
return run_command(command).split("\n")
def cordon_node(node: str):
logging.info(f"Cordoning node: {node}")
run_command(f"kubectl cordon {node}")
def cordon_node_group(node_group: str):
logging.info(f"Cordoning node group: {node_group}")
run_command(
f"kubectl get nodes -l eks.amazonaws.com/nodegroup={node_group} -o name | xargs kubectl cordon"
)
def drain_node(node: str):
logging.info(f"Draining node: {node}")
run_command(
f"kubectl drain {node} --ignore-daemonsets --delete-emptydir-data"
)
def get_asg_from_node_group(node_group: str) -> str:
return run_command(
f"aws eks describe-nodegroup --cluster-name {cluster_name} --nodegroup-name {node_group} --query 'nodegroup.resources.autoScalingGroups[0].name' --region us-west-2"
).strip('"')
def add_custom_taint(node_group: str):
logging.info(f"Tainting node group: {node_group}")
run_command(
"aws eks update-nodegroup-config --cluster-name "
+ cluster_name
+ " --nodegroup-name "
+ node_group
+ ' --taints \'{"addOrUpdateTaints": [{"key": "no-scheduling", "value": "no-scheduling", "effect": "NO_SCHEDULE"}]}\''
)
def downsize_node_group(node_group: str):
logging.info(f"Downsizing node group: {node_group}")
run_command(
"aws eks update-nodegroup-config --cluster-name "
+ cluster_name
+ " --nodegroup-name "
+ node_group
+ ' --scaling-config \'{"minSize": 0, "maxSize": 1, "desiredSize": 0}\''
)
def migrate_node_groups(old_node_group: str, new_node_group: str):
# Make sure the node groups exist
if not check_node_group_exists(old_node_group):
logging.error(f"Old node group {old_node_group} does not exist.")
return
if not check_node_group_exists(new_node_group):
logging.error(f"New node group {new_node_group} does not exist.")
return
new_asg_name = get_asg_from_node_group(new_node_group)
old_nodes = get_nodes_in_group(old_node_group)
logging.info(
f"Migrating worloads from {old_node_group} to {new_node_group}"
)
input("Confirm the node groups are correct. Press Enter to continue...")
logging.info("Continuing...")
add_custom_taint(old_node_group)
cordon_node_group(old_node_group)
temp_node_count_to_migrate = node_count_to_migrate
# Initialize new node group
new_asg_desired_size = (
get_current_node_group_size(new_asg_name) + node_count_to_migrate
)
max_size_new_node_group = get_max_node_group_size(new_asg_name)
update_node_group_size(
new_asg_name,
(
new_asg_desired_size
if new_asg_desired_size <= max_size_new_node_group
else max_size_new_node_group
),
)
# Migrate nodes
while old_nodes:
node = old_nodes.pop(0)
drain_node(node)
logging.info(f"Node {node} successfully drained.")
temp_node_count_to_migrate -= 1
if temp_node_count_to_migrate == 0:
temp_node_count_to_migrate = node_count_to_migrate
current_size_new_asg = get_current_node_group_size(new_asg_name)
new_asg_desired_size = current_size_new_asg + node_count_to_migrate
if current_size_new_asg < max_size_new_node_group:
input("Press Enter to continue...")
logging.info("Continuing...")
update_node_group_size(
new_asg_name,
(
new_asg_desired_size
if new_asg_desired_size <= max_size_new_node_group
else max_size_new_node_group
),
)
logging.info(f"All nodes in {old_node_group} drained.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Migrate EKS node groups.")
parser.add_argument(
"-c",
"--cluster-name",
type=str,
default="",
help="EKS Cluster Name",
required=True,
)
parser.add_argument(
"-n",
"--node-count",
type=int,
default=2,
help="Number of nodes to migrate at a time",
required=False,
)
parser.add_argument(
"-g",
"--node-groups-map",
type=json.loads,
default="{}",
help="Key-Value pairs of old node group to new node group in JSON format",
required=True,
)
args = parser.parse_args()
cluster_name = args.cluster_name
node_count_to_migrate = args.node_count
node_groups = args.node_groups_map
# Validate AWS credentials and check if the cluster exists
validate_aws_credentials()
if not check_cluster_exists(cluster_name):
logging.error(f"Cluster {cluster_name} does not exist. Exiting..")
exit(1)
if not node_groups:
logging.error("No node groups to migrate. Exiting...")
exit(1)
for old_node_group, new_node_group in node_groups.items():
logging.info(
f"Processing node group: {old_node_group} --> {new_node_group}"
)
migrate_node_groups(old_node_group, new_node_group)
logging.info(
f"Completed node groups: {old_node_group} --> {new_node_group}"
)
logging.info("Completed all node groups")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment