Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Lambda concurrent execution custom metric on CloudWatch
#!/usr/bin/env python
import boto3
import datetime
import time
ENABLED_REGIONS = [
"us-east-1",
"us-west-2",
"eu-west-1",
"eu-central-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-southeast-1",
"ap-southeast-2"
]
def set_metric_stats(current_time, metric):
metric_stats = {
"Namespace": "AWS/Lambda",
"StartTime": (current_time - datetime.timedelta(minutes=1)).isoformat(),
"EndTime": current_time.isoformat(),
"Period": 60
}
if metric == "duration":
stats = metric_stats.copy()
stats.update({
"MetricName": "Duration",
"Statistics": ["Average"],
"Unit": "Milliseconds"
})
elif metric == "invocations":
stats = metric_stats.copy()
stats.update({
"MetricName": "Invocations",
"Statistics": ["Sum"],
"Unit": "Count"
})
else:
print("Error: unknown metric ", metric)
return False
return stats
def get_metric_statistics(client, current_time, metric):
stats = set_metric_stats(current_time, metric)
if stats:
response = client.get_metric_statistics(**stats)
if response["Datapoints"]:
return response["Datapoints"][0][stats["Statistics"][0]]
else:
return False
def put_custom_metric(client, current_time, metric_name, metric_value, namespace, unit):
stats = {
"Namespace": namespace,
"MetricData": [{
"MetricName": metric_name,
"Timestamp": current_time.isoformat(),
"Value": metric_value,
"Unit": unit
}]
}
response = client.put_metric_data(**stats)
print("published custom metric:", response)
return response
def main():
while True:
avg_duration = False
invocations = False
for region in ENABLED_REGIONS:
print("Now checking", region)
cloudwatch = boto3.client("cloudwatch", region_name=region)
now = datetime.datetime.utcnow()
avg_duration = get_metric_statistics(client=cloudwatch, current_time=now, metric="duration")
invocations = get_metric_statistics(client=cloudwatch, current_time=now, metric="invocations")
if avg_duration and invocations:
# Lambda concurrent executions => request per second * duration.
# Since we are querying the last minute we can divide it by 60 to get an average per second.
concurrent_executions = (avg_duration / 1000) * (invocations / 60)
print("Concurrent executions last minute:", concurrent_executions)
put_custom_metric(
client=cloudwatch,
current_time=now,
namespace="LambdaCustomMetrics",
metric_name="EstimatedConcurrentExecutions",
metric_value=concurrent_executions,
unit="Count"
)
else:
print("did not get any datapoints for the past minute, skipping submission to cloudwatch")
print("waiting for the next execution in 60 sec")
time.sleep(60)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.