Skip to content

Instantly share code, notes, and snippets.

@gawel
Last active February 11, 2024 18:02
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save gawel/f48e577425f872e1a81028f3f53353cf to your computer and use it in GitHub Desktop.
Save gawel/f48e577425f872e1a81028f3f53353cf to your computer and use it in GitHub Desktop.
Locust http2 client
import re
import time
from locust import User
from locust.exception import LocustError
import httpx
from httpx import Request, Response
from requests.auth import HTTPBasicAuth
from httpx import InvalidURL, RequestError
from urllib.parse import urlparse, urlunparse
from locust.exception import CatchResponseError, ResponseError
absolute_http_url_regexp = re.compile(r"^https?://", re.I)
class LocustResponse(Response):
def raise_for_status(self):
if hasattr(self, "error") and self.error:
raise self.error
Response.raise_for_status(self)
class HttpSession(httpx.Client):
def __init__(self, base_url, request_success, request_failure, *args, **k):
super().__init__(*args, **k)
self.base_url = base_url
self.request_success = request_success
self.request_failure = request_failure
# Check for basic authentication
parsed_url = urlparse(str(self.base_url))
if parsed_url.username and parsed_url.password:
netloc = parsed_url.hostname
if parsed_url.port:
netloc += ":%d" % parsed_url.port
# remove username and password from the base_url
self.base_url = urlunparse(
(parsed_url.scheme, netloc, parsed_url.path,
parsed_url.params, parsed_url.query, parsed_url.fragment)
)
# configure requests to use basic auth
self.auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
def _build_url(self, path):
""" prepend url with hostname unless it's already an absolute URL """
if absolute_http_url_regexp.match(path):
return path
else:
return "%s%s" % (self.base_url, path)
def request(self, method, url, name=None, catch_response=False, **kwargs):
# prepend url with hostname unless it's already an absolute URL
url = self._build_url(url)
# store meta data that is used when reporting the request to locust's
# statistics
request_meta = {}
# set up pre_request hook for attaching meta data to the request object
request_meta["method"] = method
request_meta["start_time"] = time.monotonic()
response = self._send_request_safe_mode(method, url, **kwargs)
# record the consumed time
request_meta["response_time"] = (
time.monotonic() - request_meta["start_time"]) * 1000
request_meta["name"] = str(name or response.request.url)
# get the length of the content, but if the argument stream is set to
# True, we take the size from the content-length header, in order to
# not trigger fetching of the body
if kwargs.get("stream", False):
request_meta["content_size"] = int(
response.headers.get("content-length") or 0)
else:
request_meta["content_size"] = len(response.content or b"")
if catch_response:
response.locust_request_meta = request_meta
return ResponseContextManager(
response, request_success=self.request_success,
request_failure=self.request_failure
)
else:
if name:
# Since we use the Exception message when grouping failures, in
# order to not get multiple failure entries for different URLs
# for the same name argument, we need to temporarily override
# the response.url attribute
orig_url = response.url
response.url = name
try:
response.raise_for_status()
except httpx.HTTPError as e:
self.request_failure.fire(
request_type=request_meta["method"],
name=request_meta["name"],
response_time=request_meta["response_time"],
response_length=request_meta["content_size"],
exception=e,
)
else:
self.request_success.fire(
request_type=request_meta["method"],
name=request_meta["name"],
response_time=request_meta["response_time"],
response_length=request_meta["content_size"],
)
if name:
response.url = orig_url
return response
def _send_request_safe_mode(self, method, url, **kwargs):
"""
Send an HTTP request, and catch any exception that might occur due to
connection problems.
Safe mode has been removed from requests 1.x.
"""
try:
return super().request(method, url, **kwargs)
except (InvalidURL,):
raise
except RequestError as e:
r = LocustResponse()
r.error = e
r.status_code = 0 # with this status_code, content returns None
r.request = Request(method, url).prepare()
return r
class ResponseContextManager(LocustResponse):
"""
A Response class that also acts as a context manager that provides the
ability to manually control if an HTTP request should be marked as
successful or a failure in Locust's statistics
This class is a subclass of :py:class:`Response <requests.Response>` with
two additional
methods: :py:meth:`success <locust.clients.ResponseContextManager.success>`
and :py:meth:`failure <locust.clients.ResponseContextManager.failure>`.
"""
_manual_result = None
def __init__(self, response, request_success, request_failure):
# copy data from response to this object
self.__dict__ = response.__dict__
self._request_success = request_success
self._request_failure = request_failure
def __enter__(self):
return self
def __exit__(self, exc, value, traceback):
if self._manual_result is not None:
if self._manual_result is True:
self._report_success()
elif isinstance(self._manual_result, Exception):
self._report_failure(self._manual_result)
# if the user has already manually marked this response as failure
# or success we can ignore the default behaviour of letting the
# response code determine the outcome
return exc is None
if exc:
if isinstance(value, ResponseError):
self._report_failure(value)
else:
# we want other unknown exceptions to be raised
return False
else:
try:
self.raise_for_status()
except httpx.HTTPError as e:
self._report_failure(e)
else:
self._report_success()
return True
def _report_success(self):
self._request_success.fire(
request_type=self.locust_request_meta["method"],
name=self.locust_request_meta["name"],
response_time=self.locust_request_meta["response_time"],
response_length=self.locust_request_meta["content_size"],
)
def _report_failure(self, exc):
self._request_failure.fire(
request_type=self.locust_request_meta["method"],
name=self.locust_request_meta["name"],
response_time=self.locust_request_meta["response_time"],
response_length=self.locust_request_meta["content_size"],
exception=exc,
)
def success(self):
self._manual_result = True
def failure(self, exc):
if not isinstance(exc, Exception):
exc = CatchResponseError(exc)
self._manual_result = exc
class HttpxUser(User):
abstract = True
http2 = True
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.host is None:
raise LocustError(
"You must specify the base host. Either in the host attribute "
"in the User class, or on the command line using the --host "
"option."
)
self.client = HttpSession(
base_url=self.host,
http2=self.http2,
request_success=self.environment.events.request_success,
request_failure=self.environment.events.request_failure,
)
@neilsh
Copy link

neilsh commented Mar 31, 2023

Thanks for putting the gist together @gawel ! When trying it myself I found that there was a breaking change in locust 2.15.0, due to the request_success and request_failure event triggers being removed in favor of just having a request trigger. You can see the revisions I needed to make here: https://gist.github.com/neilsh/f3b59f4c522dc4d7a086344845806a45/revisions#diff-4c6bd841f943a1f86cc64256ce60114a67cdaa81c4548ce0db41d6b5c60ba194

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment