Skip to content

Instantly share code, notes, and snippets.

@ludeknovy
Last active February 4, 2022 11:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ludeknovy/af039ea46d568490cd8d09f5d0dad90d to your computer and use it in GitHub Desktop.
Save ludeknovy/af039ea46d568490cd8d09f5d0dad90d to your computer and use it in GitHub Desktop.
Locust KPI Listener - medium
import logging
from enum import Enum
from typing import List
import locust.env
from locust.stats import calculate_response_time_percentile
class Metric(Enum):
ERROR_RATE = 'error_rate'
PERCENTILE_90 = 'percentile_90'
RPS = 'rps'
@staticmethod
def has_value(item):
return item in [v.value for v in Metric.__members__.values()]
class KpiPluigin:
def __init__(
self,
env: locust.env.Environment,
kpis: List,
):
self.env = env
self.kpis = kpis
self.errors = []
self._validate_kpis()
events = self.env.events
events.quitting.add_listener(self.quitting) # pyre-ignore
def quitting(self, environment):
serialized_stats = self.serialize_stats(self.env.stats)
updated_stats = self._update_data(serialized_stats)
self._kpi_check(updated_stats)
self._interpret_errors()
def serialize_stats(self, stats):
return [stats.entries[key].serialize() for key in stats.entries.keys() if
not (stats.entries[key].num_requests == 0 and stats.entries[key].num_failures == 0)]
def _update_data(self, stats):
for stat in stats:
stat['error_rate'] = self._calculate_fail_rate(stat)
stat['percentile_90'] = self._calculate_percentile(stat, 0.90)
stat['rps'] = self._calculate_rps(stat)
return stats
def _calculate_rps(self, stat):
rps = stat['num_reqs_per_sec']
num_of_measurements = len(rps)
return sum(rps.values()) / num_of_measurements
def _calculate_fail_rate(self, stat):
num_failures = stat['num_failures']
num_requests = stat["num_requests"]
return (num_failures / num_requests) * 100
def _calculate_percentile(self, stat, percentile):
response_times = stat['response_times']
num_requests = stat['num_requests']
return calculate_response_time_percentile(response_times, num_requests, percentile)
def _kpi_check(self, stats):
if len(stats) == 0:
return
for kpi in self.kpis:
name = list(kpi.keys())[0]
stat = next(stat for stat in stats if stat["name"] == name)
if stat:
kpi_settings = kpi[list(kpi.keys())[0]]
for kpi_setting in kpi_settings:
self._metrics_check(kpi_setting, stat)
def _metrics_check(self, kpi_setting, stat):
(metric, value) = kpi_setting
name = stat["name"]
if metric == Metric.ERROR_RATE.value:
error_rate = stat['error_rate']
error_rate <= value or self._log_error(error_rate, kpi_setting, name)
if metric == Metric.PERCENTILE_90.value:
percentile = stat['percentile_90']
percentile <= value or self._log_error(percentile, kpi_setting, name)
if metric == Metric.RPS.value:
rps = stat['rps']
rps >= value or self._log_error(rps, kpi_setting, name)
def _log_error(self, stat_value, kpi_settings, name):
(metric, value) = kpi_settings
self.errors.append(
f"{metric} for '{name}' is {stat_value}, but expected it to be better than {value}") # noqa: E501
def _interpret_errors(self):
if len(self.errors) == 0:
logging.info('All KPIs are good!')
else:
for error in self.errors:
logging.error(f"SLA failed: \n {error}")
self.env.process_exit_code = 1
def _validate_kpis(self):
for kpi in self.kpis:
kpi_keys = list(kpi.keys())
if len(kpi_keys) > 1:
raise Exception("Every dict must contain definition for only one endpoint")
kpi_settings = kpi[kpi_keys[0]]
if len(kpi_settings) == 0:
raise Exception(f"No KPI defined for endpoint {kpi_keys[0]}")
for kpi_setting in kpi_settings:
(metric, value) = kpi_setting
if not isinstance(value, (int, float)):
raise Exception(f"Provide valid value for '{metric}' metric for endpoint {kpi_keys[0]}")
if not Metric.has_value(metric):
raise Exception(f"Metric {metric} not implemented")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment