Lambda concurrent execution custom metric on CloudWatch
#!/usr/bin/env python | |
import boto3 | |
import datetime | |
import time | |
ENABLED_REGIONS = [ | |
"us-east-1", | |
"us-west-2", | |
"eu-west-1", | |
"eu-central-1", | |
"ap-northeast-1", | |
"ap-northeast-2", | |
"ap-southeast-1", | |
"ap-southeast-2" | |
] | |
def set_metric_stats(current_time, metric): | |
metric_stats = { | |
"Namespace": "AWS/Lambda", | |
"StartTime": (current_time - datetime.timedelta(minutes=1)).isoformat(), | |
"EndTime": current_time.isoformat(), | |
"Period": 60 | |
} | |
if metric == "duration": | |
stats = metric_stats.copy() | |
stats.update({ | |
"MetricName": "Duration", | |
"Statistics": ["Average"], | |
"Unit": "Milliseconds" | |
}) | |
elif metric == "invocations": | |
stats = metric_stats.copy() | |
stats.update({ | |
"MetricName": "Invocations", | |
"Statistics": ["Sum"], | |
"Unit": "Count" | |
}) | |
else: | |
print("Error: unknown metric ", metric) | |
return False | |
return stats | |
def get_metric_statistics(client, current_time, metric): | |
stats = set_metric_stats(current_time, metric) | |
if stats: | |
response = client.get_metric_statistics(**stats) | |
if response["Datapoints"]: | |
return response["Datapoints"][0][stats["Statistics"][0]] | |
else: | |
return False | |
def put_custom_metric(client, current_time, metric_name, metric_value, namespace, unit): | |
stats = { | |
"Namespace": namespace, | |
"MetricData": [{ | |
"MetricName": metric_name, | |
"Timestamp": current_time.isoformat(), | |
"Value": metric_value, | |
"Unit": unit | |
}] | |
} | |
response = client.put_metric_data(**stats) | |
print("published custom metric:", response) | |
return response | |
def main(): | |
while True: | |
avg_duration = False | |
invocations = False | |
for region in ENABLED_REGIONS: | |
print("Now checking", region) | |
cloudwatch = boto3.client("cloudwatch", region_name=region) | |
now = datetime.datetime.utcnow() | |
avg_duration = get_metric_statistics(client=cloudwatch, current_time=now, metric="duration") | |
invocations = get_metric_statistics(client=cloudwatch, current_time=now, metric="invocations") | |
if avg_duration and invocations: | |
# Lambda concurrent executions => request per second * duration. | |
# Since we are querying the last minute we can divide it by 60 to get an average per second. | |
concurrent_executions = (avg_duration / 1000) * (invocations / 60) | |
print("Concurrent executions last minute:", concurrent_executions) | |
put_custom_metric( | |
client=cloudwatch, | |
current_time=now, | |
namespace="LambdaCustomMetrics", | |
metric_name="EstimatedConcurrentExecutions", | |
metric_value=concurrent_executions, | |
unit="Count" | |
) | |
else: | |
print("did not get any datapoints for the past minute, skipping submission to cloudwatch") | |
print("waiting for the next execution in 60 sec") | |
time.sleep(60) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment