Skip to content

Instantly share code, notes, and snippets.

@danield137
Last active September 28, 2019 15:35
Show Gist options
  • Save danield137/3532871cb0fa336e1e62a7c87214de60 to your computer and use it in GitHub Desktop.
Save danield137/3532871cb0fa336e1e62a7c87214de60 to your computer and use it in GitHub Desktop.
manage_ecs_services
import boto3
import json
def dump_task_conatiner_defs(
output_path='services.',
cluster_filter=lambda c_arn: True,
service_filter=lambda s_arn: True,
task_filter=lambda t_arn: True):
"""
dump all task definitions to file
"""
ecs = boto3.client("ecs")
clusters_list = ecs.list_clusters()
clusters = {c: {}
for c in clusters_list['clusterArns'] if cluster_filter(c)}
for cluster in clusters.keys():
cluster_servics = ecs.list_services(
cluster=cluster
)['serviceArns']
for service in ecs.describe_services(cluster=cluster, services=cluster_servics)['services']:
task_arn = service['taskDefinition']
service_arn = service['serviceArn']
if service_filter(service_arn) and task_filter(task_arn):
if service_arn not in clusters[cluster]:
clusters[cluster][service_arn] = {}
clusters[cluster][service_arn][task_arn] = ecs.describe_task_definition(
taskDefinition=task_arn).get('taskDefinition').get('containerDefinitions')
with open(output_path, '+w', encoding='utf8') as output_file:
json.dump(clusters, output_file)
def update_task_container_defs(file_path):
"""
update all task defs
"""
ecs = boto3.client("ecs")
updated_task_defs = []
with open(file_path, 'r', encoding='utf8') as input_file:
clusters = json.load(input_file)
for cluster, services in clusters.items():
for service, tasks in services.items():
for task_arn, conatiner_def in tasks.items():
task_family = task_arn[task_arn.rfind(
'/') + 1:task_arn.rfind(':')]
new_task_def = ecs.register_task_definition(
family=task_family, containerDefinitions=conatiner_def)
updated_task_defs.append(new_task_def.get(
'taskDefinition').get('taskDefinitionArn'))
return updated_task_defs
def update_services(updated_task_defs, cluster_filter):
# list all clusters
clusters = []
ecs = boto3.client("ecs")
response = ecs.list_clusters()
clusters = [cluster for cluster in response.get(
'clusterArns') if cluster_filter(cluster)]
# fixme: add this back
# while response.get('nextToken'):
# response = client.list_clusters(
# nextToken=response.get('nextToken')
# )
# clusters += response.get('clusterArns')
updated_families = [task_def[task_def.rfind('/') + 1:task_def.rfind(
':')] for task_def in updated_task_defs]
# For each cluster, list the services and check if the service contains updated task definition
for cluster in clusters:
response = ecs.list_services(
cluster=cluster
)
services = response.get('serviceArns')
while response.get('nextToken'):
response = ecs.list_services(
cluster=cluster,
nextToken=response.get('nextToken')
)
services += response.get('serviceArns')
# split services to chunks because describe_services accepts up to 10 services in each call
cluster_services_description = []
chunk_size = 10
service_chunks = [services[i:i + chunk_size]
for i in range(0, len(services), chunk_size)]
# describe all services in cluster
for service_chunk in service_chunks:
cluster_services_description += ecs.describe_services(
services=service_chunk, cluster=cluster).get('services')
for service_description in cluster_services_description:
service_task_definition = service_description.get('taskDefinition')
# describe task definition to get family
task_definition_desc = ecs.describe_task_definition(
taskDefinition=service_task_definition).get('taskDefinition')
# check if task definition updated
if task_definition_desc.get('family') in updated_families:
# get the family of the task definition
# update the service with the latest active task definition in the task definition family
ecs.update_service(cluster=cluster, service=service_description.get(
'serviceArn'), taskDefinition=task_definition_desc.get('family'))
@danield137
Copy link
Author

danield137 commented Jan 11, 2018

usage example:

# dump task container defs to local file 
dump_task_conatiner_defs(
    output_path='services.dev.json',
    cluster_filter=lambda c_arn: c_arn.split(
        '/')[1] in ['my-cluster', 'his-cluster]
)

# after changing some values, we can upload them back
updated_tasks = update_task_container_defs(file_path='services.dev.json')
update_services(updated_task_defs=updated_tasks, cluster_filter=lambda c_arn: c_arn.split(
    '/')[1] in ['my-cluster', 'his-cluster'])

@alon-hypr
Copy link

alon-hypr commented Jan 23, 2018

only works using python 3.x

@pangyuteng
Copy link

pangyuteng commented Sep 28, 2019

Works really well! Thanks for sharing!!

We have >10 services in one cluster, so I had to update to use the nextToken param, also updated ecs.update_service to use the flag forceNewDeployment.

        response = ecs.list_services(
            cluster=cluster
        )
        cluster_servics = response.get('serviceArns')

        while response.get('nextToken'):
            response = ecs.list_services(
                cluster=cluster,
                nextToken=response.get('nextToken')
            )
            cluster_servics += response.get('serviceArns')
                ecs.update_service(cluster=cluster, service=service_description.get(
                    'serviceArn'), taskDefinition=task_definition_desc.get('family'),forceNewDeployment=forceNewDeployment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment