-
-
Save tommyblue/6283d957b0c10813774eee8233a88a76 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Replaces all instances of a cluster in a region. | |
This only works if all instances are managed by scaling group | |
""" | |
import argparse | |
import functools | |
import logging | |
import os | |
import time | |
import boto3 | |
### START CONFIGURATIONS | |
CLUSTER = "" # Name of the cluster | |
REGION = "" # The region where the cluster is running | |
ASG_NAME = "" # Name of the autoscaling group | |
### END CONFIGURATIONS | |
def main(): | |
ecs_client = boto3.client("ecs", REGION) | |
asg_client = boto3.client("autoscaling", REGION) | |
# Can't replace if desired instances is different from running | |
if _get_desired_capacity(asg_client) != _get_running_instances(): | |
logging.critical("Can't replace instances if autoscaling activity is ongoing") | |
os._exit(1) | |
logging.info("Finding the number of desired instances in the autoscaling group") | |
desired_instances = _get_desired_capacity(asg_client) | |
logging.info("Set the running instances status as 'DRAINING'") | |
_set_running_instances_as_draining(ecs_client) | |
logging.info("Modifying the autoscaling group doubling the desired instances") | |
_set_desired_capacity(asg_client, desired_instances*2) | |
logging.info("Waiting for the new instances to be launched") | |
if not _wait_instances(desired_instances*2): | |
os._exit(1) | |
logging.info("Waiting all tasks in the draining instances to be stopped") | |
_wait_draining_instances_are_empty(ecs_client) | |
logging.info("Bringing back the desired count in the asg to its initial value") | |
_set_desired_capacity(asg_client, desired_instances) | |
logging.info("Waiting for the drained instances to be shutdown") | |
if not _wait_instances(desired_instances): | |
os._exit(1) | |
def _get_desired_capacity(asg_client) -> int: | |
resp = asg_client.describe_auto_scaling_groups(AutoScalingGroupNames=[ASG_NAME]) | |
if len(resp['AutoScalingGroups']) != 1: | |
logging.critical("Too many ASG! {}".format(resp)) | |
os._exit(1) | |
desired = resp['AutoScalingGroups'][0]["DesiredCapacity"] | |
return desired | |
def _set_running_instances_as_draining(ecs_client): | |
all_instances = ecs_client.list_container_instances(cluster=CLUSTER)['containerInstanceArns'] | |
ecs_client.update_container_instances_state( | |
cluster=CLUSTER, | |
containerInstances=all_instances, | |
status='DRAINING' | |
) | |
def _set_desired_capacity(asg_client, desired_instances): | |
asg_client.set_desired_capacity( | |
AutoScalingGroupName=ASG_NAME, | |
DesiredCapacity=desired_instances, | |
HonorCooldown=False | |
) | |
@with_sleep(sleep_time=30, max_attempts=20) | |
def _wait_instances(desired_instances): | |
running = _get_running_instances() | |
if running == desired_instances: | |
logging.info("Done!") | |
return True | |
def _get_running_instances() -> int: | |
instances = _describe_container_instances() | |
return len(instances['containerInstances']) | |
@with_sleep(sleep_time=30, max_attempts=20) | |
def _wait_draining_instances_are_empty(ecs_client): | |
tasks_per_instance = _get_tasks_per_instance(ecs_client, status=["DRAINING"]) | |
if sum(tasks_per_instance.values()) == 0: | |
return True | |
def _get_tasks_per_instance(ecs_client, status=None): | |
tasks_list = ecs_client.list_tasks(cluster=CLUSTER) | |
return _tasks_per_instance( | |
ecs_client, tasks_list['taskArns'], status=status) | |
def _describe_container_instances(): | |
ecs_client = boto3.client("ecs", REGION) | |
containers_response = ecs_client.list_container_instances(cluster=CLUSTER) | |
cluster_instances = _describe_container_instances( | |
cluster=CLUSTER, containerInstances=containers_response['containerInstanceArns']) | |
return cluster_instances | |
def _tasks_per_instance(ecs_client, tasks_list: list, status=None) -> dict: | |
""" | |
Receives an `instances` dictionary with the instances arn as key and 0 as values, returns | |
a dictionary where the arn is replaced with the id and the value is the number of tasks | |
running on that instance | |
""" | |
if status is None: | |
status = ["ACTIVE"] | |
instances = get_instances_dict(ecs_client, CLUSTER) | |
tasks_desc = ecs_client.describe_tasks(cluster=CLUSTER, tasks=tasks_list) | |
for t in tasks_desc['tasks']: | |
instances[t['containerInstanceArn']] += 1 | |
response = ecs_client.describe_container_instances( | |
cluster=CLUSTER, containerInstances=list(instances.keys())) | |
instances_as_ids = { | |
i['ec2InstanceId']: instances[i['containerInstanceArn']] | |
for i in response['containerInstances'] if i['status'] in status | |
} | |
return instances_as_ids | |
def with_sleep(sleep_time=5, max_attempts=3): | |
def sleep_decorator(func): | |
@functools.wraps(func) | |
def wrapper_sleep(*args, **kwargs): | |
attempts = 0 | |
while True: | |
attempts += 1 | |
time.sleep(sleep_time) | |
ret = func(*args, **kwargs) | |
if ret is not None: | |
return ret | |
if attempts > max_attempts: | |
print( | |
"Still not ready after %s seconds, please investigate." % (sleep_time*max_attempts)) | |
return False | |
return wrapper_sleep | |
return sleep_decorator | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment