Skip to content

Instantly share code, notes, and snippets.

@theholy7
Created June 19, 2020 12:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save theholy7/03d0562074a985dd384ecd8e3de29e6a to your computer and use it in GitHub Desktop.
Save theholy7/03d0562074a985dd384ecd8e3de29e6a to your computer and use it in GitHub Desktop.
Load testing script for MySQL database
import time
import logging
from locust import TaskSet, User, between, task, events
import mysql.connector
TEST_QUERY = '''
SELECT COUNT(*) FROM YOUR_TABLE_HERE;
'''
def get_secret():
"""
get secret from aws
"""
return {
"user": '',
"password": '',
"host": '',
"database": ''
}
def get_sample_query():
query = TEST_QUERY
conn = get_secret()
return conn, query
def execute_query(conn_info, query):
cnx = mysql.connector.connect(**conn_info)
print("Connected to DB")
cursor = cnx.cursor()
cursor.execute(query)
row = cursor.fetchone()
count = row[0]
print(f"Currently there are {count} users online")
cursor.close()
cnx.close()
return count
class MySqlClient:
def __getattr__(self, name):
def wrapper(*args, **kwargs):
start_time = time.time()
try:
res = execute_query(*args, **kwargs)
events.request_success.fire(request_type="mysql",
name=name,
response_time=int((time.time() - start_time) * 1000),
response_length=res)
except Exception as e:
events.request_failure.fire(request_type="mysql",
name=name,
response_time=int((time.time() - start_time) * 1000),
response_length=0,
exception=e)
logging.info('error {}'.format(e))
return wrapper
class MySqlTaskSet(TaskSet):
@task
def execute_query(self):
self.client.execute_query(get_sample_query()[0], get_sample_query()[1])
class MySqlLocust(User):
tasks = [MySqlTaskSet]
wait_time = between(0.1, 1)
def __init__(self, *args, **kwargs):
super(MySqlLocust, self).__init__(*args, **kwargs)
self.client = MySqlClient()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment