Skip to content

Instantly share code, notes, and snippets.

@hnishi
Last active December 30, 2021 10:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hnishi/70061f0198677dc1ea7eb2354b5525cb to your computer and use it in GitHub Desktop.
Save hnishi/70061f0198677dc1ea7eb2354b5525cb to your computer and use it in GitHub Desktop.
locust customized wait_time function that can specify total rps for each user class (this does not work in distributed mode)
# Reference: https://github.com/SvenskaSpel/locust-plugins/blob/f287bae58e696fac13be0774d8065751cca9822c/locust_plugins/wait_time.py # noqa: B950
import logging
import time
from collections import defaultdict, deque, namedtuple
from typing import Any
from locust import events, runners
_last_run_dict = defaultdict(lambda: 0.0, {})
_warning_emitted_dict = defaultdict(lambda: False, {})
_target_missed_dict = defaultdict(lambda: False, {})
_ips_window_dict = defaultdict(lambda: deque(), {})
IPS_WINDOW_SIZE = 20
@events.quitting.add_listener
def quitting(**_kwargs: Any):
for key in _warning_emitted_dict.keys():
if _warning_emitted_dict[key]:
print(
"Failed to reach targeted number of iterations per second (at some"
"point during the test). Probably caused by target system overload"
f"or too few clients. user class: {key}"
)
def constant_user_class_ips(ips: float):
def func(user):
global _warning_emitted_dict, _target_missed_dict, _last_run_dict, _ips_window_dict
class_name = user.__class__.__name__
runner = user.environment.runner
if runner is None: # happens when debugging a single locust
runner = namedtuple("FakeRunner", ["target_user_count", "state"])(
1, runners.STATE_RUNNING
)
num_users_in_class = runner.user_classes_count[class_name]
delay = num_users_in_class / ips
current_time = time.monotonic()
while (
_ips_window_dict[class_name]
and _ips_window_dict[class_name][0] < current_time - IPS_WINDOW_SIZE
):
_ips_window_dict[class_name].popleft()
previous_rate = len(_ips_window_dict[class_name]) / IPS_WINDOW_SIZE
_ips_window_dict[class_name].append(current_time)
rate_diff = previous_rate - ips
delay = delay * (1 + max(min(rate_diff, 0.5), -0.5) / ips)
next_time = _last_run_dict[class_name] + delay
if current_time > next_time:
if (
runner.state == runners.STATE_RUNNING
and _target_missed_dict[class_name]
and not _warning_emitted_dict[class_name]
):
logging.warning(
"Failed to reach target ips, even after rampup has finished"
)
_warning_emitted_dict[class_name] = True # stop logging
_target_missed_dict[class_name] = True
_last_run_dict[class_name] = current_time
return 0
_target_missed_dict[class_name] = False
_last_run_dict[class_name] = next_time
return delay
return func
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment