Last active
February 4, 2022 11:47
-
-
Save ludeknovy/af039ea46d568490cd8d09f5d0dad90d to your computer and use it in GitHub Desktop.
Locust KPI Listener - medium
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from enum import Enum | |
from typing import List | |
import locust.env | |
from locust.stats import calculate_response_time_percentile | |
class Metric(Enum): | |
ERROR_RATE = 'error_rate' | |
PERCENTILE_90 = 'percentile_90' | |
RPS = 'rps' | |
@staticmethod | |
def has_value(item): | |
return item in [v.value for v in Metric.__members__.values()] | |
class KpiPluigin: | |
def __init__( | |
self, | |
env: locust.env.Environment, | |
kpis: List, | |
): | |
self.env = env | |
self.kpis = kpis | |
self.errors = [] | |
self._validate_kpis() | |
events = self.env.events | |
events.quitting.add_listener(self.quitting) # pyre-ignore | |
def quitting(self, environment): | |
serialized_stats = self.serialize_stats(self.env.stats) | |
updated_stats = self._update_data(serialized_stats) | |
self._kpi_check(updated_stats) | |
self._interpret_errors() | |
def serialize_stats(self, stats): | |
return [stats.entries[key].serialize() for key in stats.entries.keys() if | |
not (stats.entries[key].num_requests == 0 and stats.entries[key].num_failures == 0)] | |
def _update_data(self, stats): | |
for stat in stats: | |
stat['error_rate'] = self._calculate_fail_rate(stat) | |
stat['percentile_90'] = self._calculate_percentile(stat, 0.90) | |
stat['rps'] = self._calculate_rps(stat) | |
return stats | |
def _calculate_rps(self, stat): | |
rps = stat['num_reqs_per_sec'] | |
num_of_measurements = len(rps) | |
return sum(rps.values()) / num_of_measurements | |
def _calculate_fail_rate(self, stat): | |
num_failures = stat['num_failures'] | |
num_requests = stat["num_requests"] | |
return (num_failures / num_requests) * 100 | |
def _calculate_percentile(self, stat, percentile): | |
response_times = stat['response_times'] | |
num_requests = stat['num_requests'] | |
return calculate_response_time_percentile(response_times, num_requests, percentile) | |
def _kpi_check(self, stats): | |
if len(stats) == 0: | |
return | |
for kpi in self.kpis: | |
name = list(kpi.keys())[0] | |
stat = next(stat for stat in stats if stat["name"] == name) | |
if stat: | |
kpi_settings = kpi[list(kpi.keys())[0]] | |
for kpi_setting in kpi_settings: | |
self._metrics_check(kpi_setting, stat) | |
def _metrics_check(self, kpi_setting, stat): | |
(metric, value) = kpi_setting | |
name = stat["name"] | |
if metric == Metric.ERROR_RATE.value: | |
error_rate = stat['error_rate'] | |
error_rate <= value or self._log_error(error_rate, kpi_setting, name) | |
if metric == Metric.PERCENTILE_90.value: | |
percentile = stat['percentile_90'] | |
percentile <= value or self._log_error(percentile, kpi_setting, name) | |
if metric == Metric.RPS.value: | |
rps = stat['rps'] | |
rps >= value or self._log_error(rps, kpi_setting, name) | |
def _log_error(self, stat_value, kpi_settings, name): | |
(metric, value) = kpi_settings | |
self.errors.append( | |
f"{metric} for '{name}' is {stat_value}, but expected it to be better than {value}") # noqa: E501 | |
def _interpret_errors(self): | |
if len(self.errors) == 0: | |
logging.info('All KPIs are good!') | |
else: | |
for error in self.errors: | |
logging.error(f"SLA failed: \n {error}") | |
self.env.process_exit_code = 1 | |
def _validate_kpis(self): | |
for kpi in self.kpis: | |
kpi_keys = list(kpi.keys()) | |
if len(kpi_keys) > 1: | |
raise Exception("Every dict must contain definition for only one endpoint") | |
kpi_settings = kpi[kpi_keys[0]] | |
if len(kpi_settings) == 0: | |
raise Exception(f"No KPI defined for endpoint {kpi_keys[0]}") | |
for kpi_setting in kpi_settings: | |
(metric, value) = kpi_setting | |
if not isinstance(value, (int, float)): | |
raise Exception(f"Provide valid value for '{metric}' metric for endpoint {kpi_keys[0]}") | |
if not Metric.has_value(metric): | |
raise Exception(f"Metric {metric} not implemented") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment