Skip to content

Instantly share code, notes, and snippets.

@bradhowes
Created June 8, 2017 12:51
Show Gist options
  • Save bradhowes/bea76e850d3360a39e0077666e64c973 to your computer and use it in GitHub Desktop.
Save bradhowes/bea76e850d3360a39e0077666e64c973 to your computer and use it in GitHub Desktop.
Simple Python 3 Performance Test Script
import asyncio
import websockets
import json
import pickle
import sys
import multiprocessing
import random
import time
from collections import deque
from datetime import datetime
from urllib import parse
from aiohttp import ClientSession
from cannedAccounts import cannedAccountsInfo
class Timer(object):
def __init__(self):
self.pendingTags = deque()
self.log = []
@staticmethod
def now():
return datetime.utcnow()
def begin(self, tag):
self.pendingTags.append((tag, self.now()))
def end(self, tag):
tag, startTime = self.pendingTags.pop()
endTime = self.now()
delta = endTime - startTime
self.log.append((endTime.isoformat(), tag, delta))
def exceptioned(self, exception):
endTime = self.now()
endTimeFormatted = endTime.isoformat()
while len(self.pendingTags) > 0:
tag, startTime = self.pendingTags.pop()
self.timer.append((endTimeFormatted, tag + ' *** failed: {}'.format(exception.status), 0.0))
def __iter__(self):
return iter(self.log)
def __len__(self):
return len(self.log)
class RESTFailure(Exception):
def __init__(self, resp):
self.status = resp.status
class Mattermost(object):
def __init__(self, loop, team = 'PersonalTouch'):
self.base = 'https://enrico.teaches-yoga.com/api/v3/'
self.loop = loop
self.session = ClientSession(loop = loop)
self.tasks = []
self.headers = {'Content-Type': 'application/json'}
self._authToken = None
@property
def authToken(self):
return self._authToken
@authToken.setter
def authToken(self, value):
self._authToken = value
self.headers['Authorization'] = 'Bearer ' + value
def enqueue(self, method, *args):
self.tasks.append(asyncio.ensure_future(method(*args)))
def done(self):
self.loop.run_until_complete(asyncio.wait(self.tasks))
results = [each.result() for each in self.tasks]
self.tasks = []
# print(results)
return results
async def login(self, team, loginId, password):
self.team = team
self.loginId = loginId
self.password = password
url = '{base}users/login'.format(base = self.base)
payload = {'name': team, 'login_id': loginId, 'password': password}
async with self.session.post(url, headers = self.headers, data = json.dumps(payload)) as resp:
if resp.status != 200:
raise RESTFailure(resp)
payload = await resp.json()
return resp.headers['Token'], payload
async def fetchTeams(self):
url = '{base}teams/members'.format(base = self.base)
async with self.session.get(url, headers = self.headers) as resp:
if resp.stats != 200:
raise RESTFailure(resp)
payload = await resp.json()
return payload
async def fetchChannels(self):
url = '{base}teams/{teamId}/channels/'.format(base = self.base, teamId = self.teamId)
async with self.session.get(url, headers = self.headers) as resp:
if resp.status != 200:
raise RESTFailure(resp)
payload = await resp.json()
return payload
async def fetchRoster(self):
url = '{base}teams/{teamId}/channels/{channelId}/users/0/100'.format(
base = self.base, teamId = self.teamId, channelId = self.channelId)
async with self.session.get(url, headers = self.headers) as resp:
if resp.status != 200:
raise RESTFailure(resp)
payload = await resp.json()
return payload
async def fetchPosts(self):
url = '{base}teams/{teamId}/channels/{channelId}/posts/since/{since}'.format(
base = self.base, teamId = self.teamId, channelId = self.channelId, since = 0)
async with self.session.get(url, headers = self.headers) as resp:
if resp.status != 200:
raise RESTFailure(resp)
payload = await resp.json()
return payload
async def markChannelRead(self):
url = '{base}teams/{teamId}/channels/view'.format(
base = self.base, teamId = self.teamId)
payload = {'channel_id': self.channelId}
async with self.session.post(url, headers = self.headers, data = json.dumps(payload)) as resp:
if resp.status != 200:
raise RESTFailure(resp)
payload = await resp.json()
return payload
async def makeWebSocket(self):
url = '{base}users/websocket'.format(base = self.base)
parts = list(parse.urlparse(url))
parts[0] = 'wss'
url = parse.urlunparse(parts)
async with websockets.connect(url, extra_headers = self.headers ) as websocket:
payload = {
'seq': 1,
'action': 'authentication_challenge',
'data': {'token': self.authToken}
}
await websocket.send(json.dumps(payload, ensure_ascii = False))
while True:
response = json.loads(await websocket.recv())
if response.get('event') == 'posted':
tmp = json.loads(response['data']['post'])
if tmp['message'].endswith('Perf test message'):
return response
async def newMessage(self):
url = '{base}teams/{teamId}/channels/{channelId}/posts/create'.format(
base = self.base, teamId = self.teamId, channelId = self.channelId)
payload = {'id': '',
'create_at': 0,
'update_at': 0,
'delete_at': 0,
'user_id': self.userId,
'channel_id': self.channelId,
'root_id': '',
'parent_id': '',
'original_id': '',
'type': '',
'props': {},
'message': Timer.now().isoformat() + ' Perf test message'
}
async with self.session.post(url, headers = self.headers, data = json.dumps(payload)) as resp:
if resp.status != 200:
raise RESTFailure(resp)
response = await resp.json()
return response
def run(pid, delay, userName, password):
timer = Timer()
try:
loop = asyncio.get_event_loop()
mm = Mattermost(loop)
timer.begin('delay')
time.sleep(delay)
timer.end('delay')
# Login
#
timer.begin('total')
timer.begin('login')
mm.enqueue(mm.login, 'PersonalTouch', userName, password)
mm.authToken, us = mm.done()[0]
mm.userId = us['id']
timer.end('login')
# Fetch Team ID
#
timer.begin('fetchTeams')
mm.enqueue(mm.fetchTeams)
mm.teamId = mm.done()[0][0]['team_id']
timer.end('fetchTeams')
# Fetch Channels
#
timer.begin('fetchChannels')
mm.enqueue(mm.fetchChannels)
results = mm.done()[0]
timer.end('fetchChannels')
for each in results:
if each['name'].startswith('taskme'):
mm.channelId = each['id']
break
else:
assert hasattr(mm, 'channelId')
# Create WebSocket, fetch roster, fetch posts
#
timer.begin('fetchPosts')
mm.websocket = asyncio.ensure_future(mm.makeWebSocket())
mm.enqueue(mm.fetchRoster)
mm.enqueue(mm.fetchPosts)
results = mm.done()
timer.end('fetchPosts')
# Mark channel as 'read'
#
timer.begin('markChannelRead')
mm.enqueue(mm.markChannelRead)
results = mm.done()
timer.end('markChannelRead')
# Post new message
#
timer.begin('newMessage')
mm.enqueue(mm.newMessage)
results = mm.done()
timer.end('newMessage')
# Receive message over WebSocket
#
timer.begin('receiveMessage')
loop.run_until_complete(asyncio.wait([mm.websocket]))
result = mm.websocket.result()
timer.end('receiveMessage')
timer.end('total')
except (RESTFailure,) as exception:
timer.exceptioned(exception)
finally:
mm.session.close()
loop.close()
pickle.dump(timer.log, open('perf_user_{}.pkl'.format(pid), 'wb'))
if __name__ == '__main__':
count = int(sys.argv[1])
duration = int(sys.argv[2])
lmbda = count / duration
delays = []
delay = 0.1
for each in range(count):
delays.append(delay)
delay += random.expovariate(lmbda)
print(delays)
customers = cannedAccountsInfo['customers']
processes = []
for index in range(count):
customer = customers[index]
p = multiprocessing.Process(target = run, args = (index, delays[index], customer['userName'],
customer['password']))
p.start()
processes.append(p)
with open('perf_times_{}_{}.csv'.format(count, duration), 'w') as output:
for index, p in enumerate(processes):
p.join()
log = pickle.load(open('perf_user_{}.pkl'.format(index), 'rb'))
if index == 0:
tags = ['Id'] + [z[1] for z in log]
output.write(','.join(tags) + '\n')
output.write(','.join([str(index)] + [str(z[-1]) for z in log]) + '\n')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment