Skip to content

Instantly share code, notes, and snippets.

@depop-blog
Created June 27, 2018 15:39
Show Gist options
  • Save depop-blog/4c2161d731dd7db6fdeec12b88c424ff to your computer and use it in GitHub Desktop.
Save depop-blog/4c2161d731dd7db6fdeec12b88c424ff to your computer and use it in GitHub Desktop.
Ahead of time scheduling on ECS/EC2: full_metric_lambda.py
import boto3
import datetime
import logging
import json
import re
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.info('Loading function...')
ecs = boto3.client('ecs')
autoscaling = boto3.client('autoscaling')
cw = boto3.client('cloudwatch')
def handler(event, context):
asgs = get_aot_autoscaling_groups()
logger.debug(json.dumps(asgs, default=str))
logger.info('found %s AOT schedulable autoscaling groups...', len(asgs))
metric_data = []
for asg in asgs:
logger.info('Processing ASG %s', asg['AutoScalingGroupName'])
cluster_name = next(x for x in asg['Tags'] if x['Key'] == 'cluster_name')['Value']
logger.info('Calculating schedulable containers for cluster %s', cluster_name)
instance_list = ecs.list_container_instances(cluster=cluster_name, status='ACTIVE')
if len(instance_list['containerInstanceArns']) == 0:
logger.info('Cluster has no container instances, skipping...')
continue
instances = ecs.describe_container_instances(cluster=cluster_name,
containerInstances=instance_list['containerInstanceArns'])
tasks = get_tasks(cluster_name)
if len(tasks) == 0:
logger.info('Received no tasks... skipping cluster')
continue
logger.debug(json.dumps(tasks, default=str))
max_cpu = get_max_cpu(tasks)
max_memory = int(max(tasks, key=lambda x: x['memory'])['memory'])
logger.info('Max cpu: %s', max_cpu)
logger.info('Max memory: %s', max_memory)
schedulable_containers = 0
for instance in instances['containerInstances']:
remaining_resources = {resource['name']: resource for resource in instance['remainingResources']}
containers_by_cpu = int(remaining_resources['CPU']['integerValue'] / max_cpu)
containers_by_mem = int(remaining_resources['MEMORY']['integerValue'] / max_memory)
logger.info('Remaining resources on %s. CPU: %s, Memory: %s',
instance['ec2InstanceId'],
remaining_resources['CPU']['integerValue'],
remaining_resources['MEMORY']['integerValue'])
schedulable_containers += min(containers_by_cpu, containers_by_mem)
logger.info('%s containers could be scheduled on %s based on CPU only',
containers_by_cpu, instance['ec2InstanceId'])
logger.info('%s containers could be scheduled on %s based on memory only',
containers_by_mem, instance['ec2InstanceId'])
logger.info('Schedulable containers: %s', schedulable_containers)
metric_data.append({
'MetricName': 'SchedulableContainers',
'Dimensions': [{
'Name': 'ClusterName',
'Value': re.sub(r'^.*/', '', cluster_name)
}],
'Timestamp': datetime.datetime.now(),
'Value': schedulable_containers
})
logger.debug('Sending the following metrics to CloudWatch: {}', metric_data)
cw.put_metric_data(Namespace='AWS/ECS',
MetricData=metric_data)
logger.info('Metric was sent to CloudWatch')
return {}
def get_max_cpu(tasks):
max_cpu = 1
for task in tasks:
if task['cpu'] == 0:
raise Exception('Task {} does not have CPU reservation specified in AOT cluster'.format(task['taskArn']))
if int(task['cpu']) > max_cpu:
max_cpu = int(task['cpu'])
return max_cpu
def get_aot_autoscaling_groups():
paginator = autoscaling.get_paginator('describe_auto_scaling_groups')
page_iterator = paginator.paginate(
PaginationConfig={'PageSize': 100}
)
# NOTE: only ASGs with the tag aot_schedulable=yes will be retrieved
filtered_asgs = page_iterator.search(
'AutoScalingGroups[] | [?contains(Tags[?Key==`{}`].Value, `{}`)]'.format(
'aot_schedulable', 'yes')
)
return list(filtered_asgs)
def get_tasks(cluster):
tasks = []
response = {'nextToken': ''}
while 'nextToken' in response:
response = ecs.list_tasks(cluster=cluster, maxResults=100)
tasks.extend(response['taskArns'])
if len(tasks) > 0:
result = ecs.describe_tasks(
cluster=cluster,
tasks=tasks
)
return result['tasks']
else:
return []
@miked0004
Copy link

Line 47, I had to cast the task memory to int in the lambda, otherwise, it was not getting the max int vaue.

max_memory = int(max(tasks, key=lambda x: int(x['memory']))['memory'])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment