Skip to content

Instantly share code, notes, and snippets.

@kshahkshah
Created June 15, 2017 19:06
Show Gist options
  • Save kshahkshah/a81cfb02220d5c692795f1964ad31d0e to your computer and use it in GitHub Desktop.
Save kshahkshah/a81cfb02220d5c692795f1964ad31d0e to your computer and use it in GitHub Desktop.
locust file with websockets integration, reduced example
import time
import json
import websocket
import threading
import gevent
import logging
import dateutil
import datetime
from websocket import create_connection
from locust import HttpLocust, TaskSet, task
from users import *
from locust.events import request_success
from locust.exception import StopLocust
class EnterMainflow(TaskSet):
def on_start(self):
# get a user
self.current_user = all_users.pop()
# something which we use to determine the state, and which events in the websocket
# channel will cause this to be appended to
self.list_of_events = []
# make a websocket connection
ws = create_connection("ws://ws.pusherapp.com:80/app/REDACTED?client=loadtester&version=1.0&protocol=5")
# store it
self.ws = ws
# make a handler
def _receive():
while True:
res = ws.recv()
message = json.loads(res)
print("**** PUSHER MESSAGE INCOMING FOR {user} ****".format(user=self.current_user['username']))
print(message)
if message['event'] == "thing_i_care_about":
self.list_of_events.append(message['data']['thing']['id'])
request_success.fire(
request_type='WebSocket Recv',
name='test/ws/echo',
response_time=10,
response_length=len(res),
)
# gevent handler
gevent.spawn(_receive)
@task
def main_tasks(self):
self.http_action_one()
self.http_action_two()
self.websocket_channel_subscription()
self.http_action_three()
# exit this task...
raise StopLocust()
def websocket_channel_subscription(self):
body = json.dumps({
"event": "pusher:subscribe",
"data": {
"channel": "channel_{channel_id}".format(channel_id=self.current_channel['id']),
}
})
self.ws.send(body)
def http_action_one(self):
# innocuous get request...
def http_action_two(self):
# innocuous post request...
def http_action_three(self):
# uses this self.list_of_events, which is being appended to by the websocket receive handler
self.list_of_events
# and does some http action as a result...
self.client.post(url, ...)
class WebsiteUser(HttpLocust):
task_set = EnterMainflow
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment