Last active
December 30, 2021 10:14
-
-
Save hnishi/70061f0198677dc1ea7eb2354b5525cb to your computer and use it in GitHub Desktop.
locust customized wait_time function that can specify total rps for each user class (this does not work in distributed mode)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Reference: https://github.com/SvenskaSpel/locust-plugins/blob/f287bae58e696fac13be0774d8065751cca9822c/locust_plugins/wait_time.py # noqa: B950 | |
import logging | |
import time | |
from collections import defaultdict, deque, namedtuple | |
from typing import Any | |
from locust import events, runners | |
_last_run_dict = defaultdict(lambda: 0.0, {}) | |
_warning_emitted_dict = defaultdict(lambda: False, {}) | |
_target_missed_dict = defaultdict(lambda: False, {}) | |
_ips_window_dict = defaultdict(lambda: deque(), {}) | |
IPS_WINDOW_SIZE = 20 | |
@events.quitting.add_listener | |
def quitting(**_kwargs: Any): | |
for key in _warning_emitted_dict.keys(): | |
if _warning_emitted_dict[key]: | |
print( | |
"Failed to reach targeted number of iterations per second (at some" | |
"point during the test). Probably caused by target system overload" | |
f"or too few clients. user class: {key}" | |
) | |
def constant_user_class_ips(ips: float): | |
def func(user): | |
global _warning_emitted_dict, _target_missed_dict, _last_run_dict, _ips_window_dict | |
class_name = user.__class__.__name__ | |
runner = user.environment.runner | |
if runner is None: # happens when debugging a single locust | |
runner = namedtuple("FakeRunner", ["target_user_count", "state"])( | |
1, runners.STATE_RUNNING | |
) | |
num_users_in_class = runner.user_classes_count[class_name] | |
delay = num_users_in_class / ips | |
current_time = time.monotonic() | |
while ( | |
_ips_window_dict[class_name] | |
and _ips_window_dict[class_name][0] < current_time - IPS_WINDOW_SIZE | |
): | |
_ips_window_dict[class_name].popleft() | |
previous_rate = len(_ips_window_dict[class_name]) / IPS_WINDOW_SIZE | |
_ips_window_dict[class_name].append(current_time) | |
rate_diff = previous_rate - ips | |
delay = delay * (1 + max(min(rate_diff, 0.5), -0.5) / ips) | |
next_time = _last_run_dict[class_name] + delay | |
if current_time > next_time: | |
if ( | |
runner.state == runners.STATE_RUNNING | |
and _target_missed_dict[class_name] | |
and not _warning_emitted_dict[class_name] | |
): | |
logging.warning( | |
"Failed to reach target ips, even after rampup has finished" | |
) | |
_warning_emitted_dict[class_name] = True # stop logging | |
_target_missed_dict[class_name] = True | |
_last_run_dict[class_name] = current_time | |
return 0 | |
_target_missed_dict[class_name] = False | |
_last_run_dict[class_name] = next_time | |
return delay | |
return func |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment