Instantly share code, notes, and snippets.

Embed
What would you like to do?
DynamoDB Autoscaling Manager
from __future__ import print_function, unicode_literals
import calendar
import datetime
from collections import defaultdict
import boto3
MANAGER_ENABLED_TAG = 'autoscaling_manager_enabled'
autoscaling_client = boto3.client('application-autoscaling', region_name='us-west-2')
dynamo_client = boto3.client('dynamodb', region_name='us-west-2')
resource_groups_tagging_client = boto3.client('resourcegroupstaggingapi', region_name='us-west-2')
cloudwatch_resource = boto3.resource('cloudwatch', region_name='us-west-2')
class DynamoDBAutoscalingManager:
CONSUMED_WRITE_CAPACITY_METRIC = 'ConsumedWriteCapacityUnits'
CONSUMED_READ_CAPACITY_METRIC = 'ConsumedReadCapacityUnits'
WRITE_THROTTLE_EVENTS_METRIC = 'WriteThrottleEvents'
READ_THROTTLE_EVENTS_METRIC = 'ReadThrottleEvents'
ACTIVE_TABLE_STATUS = 'ACTIVE'
LOOK_BACK_WINDOW_MINUTES = 5
CAPACITY_METRICS = [
CONSUMED_WRITE_CAPACITY_METRIC,
CONSUMED_READ_CAPACITY_METRIC,
]
THROTTLE_METRICS = [
WRITE_THROTTLE_EVENTS_METRIC,
READ_THROTTLE_EVENTS_METRIC,
]
ALL_METRICS = CAPACITY_METRICS + THROTTLE_METRICS
READ_CAPACITY = 'ReadCapacityUnits'
WRITE_CAPACITY = 'WriteCapacityUnits'
MAPPED_METRICS = {
READ_CAPACITY: {
'consumed': CONSUMED_READ_CAPACITY_METRIC,
'throttling': READ_THROTTLE_EVENTS_METRIC
},
WRITE_CAPACITY: {
'consumed': CONSUMED_WRITE_CAPACITY_METRIC,
'throttling': WRITE_THROTTLE_EVENTS_METRIC
}
}
MIN_SETTINGS = {
READ_CAPACITY: 5,
WRITE_CAPACITY: 5,
}
MAX_SETTINGS = {
READ_CAPACITY: 100000,
WRITE_CAPACITY: 80000,
}
# these settings are used to decide when to modify capacity
THROTTLING_THRESHOLD = 50 # how much throttling triggers a bigger response
HIGH_THROTTLING_CAPACITY_INCREASE = 4000
LOW_THROTTLING_CAPACITY_INCREASE = 1000
TARGET_UTILIZATION = 0.8 # ideal target utilization
BUFFERED_TARGET_UTILIZATION = 0.6 # how much we can deviate to keep decreases in check
def __init__(self, table_name):
self.table_name = table_name
self.table_data = self.get_dynamo_table_data()
self.gsi_data = {index_data['IndexName']: index_data
for index_data in self.table_data.get('GlobalSecondaryIndexes', [])}
self.current_throughput_settings = {self.table_name: self.table_data['ProvisionedThroughput']}
for index_name in self.gsi_data:
self.current_throughput_settings[index_name] = self.gsi_data[index_name]['ProvisionedThroughput']
def get_dynamo_table_data(self):
"""
Get the current table status + provisioned capacity
"""
return dynamo_client.describe_table(TableName=self.table_name)['Table']
def get_throughput_settings_for_request(self, element_name, new_throughput_settings_by_element):
"""
Get the new throughput settings we'll be sending in the request
"""
return {
self.READ_CAPACITY: new_throughput_settings_by_element[element_name].get(
self.READ_CAPACITY, self.current_throughput_settings[element_name][self.READ_CAPACITY]),
self.WRITE_CAPACITY: new_throughput_settings_by_element[element_name].get(
self.WRITE_CAPACITY, self.current_throughput_settings[element_name][self.WRITE_CAPACITY])
}
def modify_dynamo_capacity(self, new_throughput_settings_by_element):
"""
Modify dynamo capacity based on the passed settings
"""
update_parameters = {
'TableName': self.table_name
}
if self.table_name in new_throughput_settings_by_element:
update_parameters['ProvisionedThroughput'] = self.get_throughput_settings_for_request(
self.table_name, new_throughput_settings_by_element)
gsi_updates = []
for index_name in self.gsi_data:
if index_name in new_throughput_settings_by_element:
gsi_updates.append(
{
'Update': {'IndexName': index_name,
'ProvisionedThroughput': self.get_throughput_settings_for_request(
index_name, new_throughput_settings_by_element)}
}
)
if gsi_updates:
update_parameters['GlobalSecondaryIndexUpdates'] = gsi_updates
try:
dynamo_client.update_table(**update_parameters)
print('Updated {} with the following settings: {}'.format(self.table_name, update_parameters))
except Exception as e:
print('There was an error while trying to update the table capacity of {}: {}'.format(self.table_name, e))
def get_cloudwatch_metric(self, metric_name, index_name=None):
"""
Retrieve the data points for a metric, table pair for the requested range
This method will assume that the passed in metrics have a resolution
of at least 1 minute to fill the cloudwatch gaps with zeroes
The capacity metrics are averaged over one minute
"""
end_date = datetime.datetime.utcnow().replace(second=0, microsecond=0)
start_date = end_date - datetime.timedelta(minutes=self.LOOK_BACK_WINDOW_MINUTES)
metric = cloudwatch_resource.Metric('AWS/DynamoDB', metric_name)
dimensions = [{'Name': 'TableName', 'Value': self.table_name}, ]
if index_name:
dimensions.append({'Name': 'GlobalSecondaryIndexName', 'Value': index_name})
response = metric.get_statistics(
Dimensions=dimensions,
StartTime=start_date,
EndTime=end_date,
Period=60,
Statistics=['Sum'],
)
data_points = {calendar.timegm(data_point['Timestamp'].timetuple()): data_point
for data_point in response['Datapoints']}
metric_report = []
current_date = start_date
while current_date < end_date:
current_timestamp = calendar.timegm(current_date.timetuple())
sum_value = data_points.get(current_timestamp, {}).get('Sum', 0)
metric_report.append({
'timestamp': current_timestamp,
'value': sum_value / 60 if metric_name in self.CAPACITY_METRICS else sum_value
})
current_date += datetime.timedelta(minutes=1)
return metric_report
def get_usage_metrics(self):
"""
Retrieve all the needed metrics for the current table
"""
table_metrics = defaultdict(dict)
for metric in self.ALL_METRICS:
table_metrics[self.table_name][metric] = self.get_cloudwatch_metric(metric)
if self.gsi_data:
for index_name in self.gsi_data:
for metric in self.ALL_METRICS:
table_metrics[index_name][metric] = self.get_cloudwatch_metric(metric, index_name)
return table_metrics
def get_new_throughput_settings(self, element_name, element_usage_metrics):
"""
Get the new throughput settings for the provided element metrics
Here's where our autoscaling algorithm gets applied
This is used both for tables and gsi
"""
new_throughput_settings = {}
current_throughput_settings = self.current_throughput_settings[element_name]
for capacity_setting in self.MAPPED_METRICS:
currently_provisioned = current_throughput_settings[capacity_setting]
new_capacity_settings = None
throttling_metrics = element_usage_metrics[self.MAPPED_METRICS[capacity_setting]['throttling']]
average_throttling = (sum(record['value'] for record in throttling_metrics) / len(throttling_metrics))
consumed_metrics = element_usage_metrics[self.MAPPED_METRICS[capacity_setting]['consumed']]
average_consumed = (sum(record['value'] for record in consumed_metrics) / len(consumed_metrics))
utilization = average_consumed / currently_provisioned
can_increase_capacity = currently_provisioned < self.MAX_SETTINGS[capacity_setting]
target_provisioning = int(average_consumed * (1 / self.TARGET_UTILIZATION))
min_provisioning = self.MIN_SETTINGS[capacity_setting]
if average_throttling and can_increase_capacity: # While throttling, we do quantized increases
new_capacity_settings = currently_provisioned + (self.HIGH_THROTTLING_CAPACITY_INCREASE
if average_throttling >= self.THROTTLING_THRESHOLD
else self.LOW_THROTTLING_CAPACITY_INCREASE)
print('{}-{} has {} average throttling'.format(
element_name, capacity_setting, average_throttling))
elif currently_provisioned < min_provisioning or not average_consumed:
new_capacity_settings = min_provisioning # If we are below min or nothing is using this table
print('{}-{} has no consumed capacity'.format(
element_name, capacity_setting))
elif utilization > self.TARGET_UTILIZATION and can_increase_capacity:
new_capacity_settings = target_provisioning # Increase to match provisioning
print('{}-{} has high utilization ({})'.format(
element_name, capacity_setting, utilization))
elif not average_throttling and utilization < self.BUFFERED_TARGET_UTILIZATION:
new_capacity_settings = target_provisioning # Decrease to match provisioning, giving some buffer
print('{}-{} has low utilization ({})'.format(
element_name, capacity_setting, utilization))
if new_capacity_settings and new_capacity_settings != currently_provisioned:
print('Setting {}-{} capacity to {}'.format(element_name, capacity_setting, new_capacity_settings))
new_throughput_settings[capacity_setting] = new_capacity_settings
else:
print('{}-{} is already at {}. Skipping'.format(element_name, capacity_setting, currently_provisioned))
return new_throughput_settings
def manage_table_autoscaling(self):
"""
Retrieve the current throughput settings of the table and the consumed capacity in cloudwatch
If we detect that the capacity settings need adjusting, we'll try to change them
"""
if self.table_data['TableStatus'] != self.ACTIVE_TABLE_STATUS:
print('{} is not in {} status. Skipping as we will not be able to update it'.format(self.table_name,
self.ACTIVE_TABLE_STATUS))
return
usage_metrics = self.get_usage_metrics()
new_throughput_settings_by_element = {}
for element_name in [self.table_name] + list(self.gsi_data.keys()):
new_throughput_settings = self.get_new_throughput_settings(element_name, usage_metrics[element_name])
if new_throughput_settings:
new_throughput_settings_by_element[element_name] = new_throughput_settings
if new_throughput_settings_by_element:
self.modify_dynamo_capacity(new_throughput_settings_by_element)
def is_table_auto_scaling_enabled(table_name):
"""
Detect whether we even need to assist an auto scaling policy
"""
response = autoscaling_client.describe_scaling_policies(ServiceNamespace='dynamodb',
ResourceId='table/{}'.format(table_name))
return bool(response['ScalingPolicies'])
def get_enabled_tables():
"""
Retrieve the tables that have the MANAGER_ENABLED_TAG set to true
"""
response = resource_groups_tagging_client.get_resources(
TagFilters=[{
'Key': MANAGER_ENABLED_TAG, 'Values': ['true']
}],
ResourceTypeFilters=['dynamodb']
)
return ([item['ResourceARN'].split('/')[1] for item in response['ResourceTagMappingList']]
if response['ResourceTagMappingList'] else [])
def handler(event, context):
"""
This lambda function manages dynamo db scaling for the tables that have the MANAGER_ENABLED_TAG tag set to true
If autoscaling is already enabled for this table, this won't perform any operations
"""
processed_tables = []
enabled_tables = get_enabled_tables()
print('Retrieved the following tables with the {} tag set to true: {}'.format(MANAGER_ENABLED_TAG, enabled_tables))
for table_name in enabled_tables:
if not is_table_auto_scaling_enabled(table_name):
print('Will manage autoscaling for {}'.format(table_name))
DynamoDBAutoscalingManager(table_name).manage_table_autoscaling()
processed_tables.append(table_name)
else:
print('{} does is managed by dynamo autoscaling. Skipping'.format(table_name))
return {
'processed_tables': processed_tables,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment