Skip to content

Instantly share code, notes, and snippets.

@juanandresnyc
Created December 13, 2019 20:29
Show Gist options
  • Save juanandresnyc/479785dccde69ad93810c2fd42bc31a3 to your computer and use it in GitHub Desktop.
Save juanandresnyc/479785dccde69ad93810c2fd42bc31a3 to your computer and use it in GitHub Desktop.
probes patient medication endpoints for latency and size responses
import requests
import json
URL_BASE = {
'local': 'http://localhost:8000',
'staging': 'https://api.staging.internal.capsulerx.com'
}
CREDENTIALS = {
'local': ('jandrango@capsulecares.com', 'hellodear'),
'staging': ('jandrango@capsulecares.com', 'YOURS_HERE')
}
def get_token(environment, email, password):
url = '{url_base}/internal/account/login'.format(url_base=URL_BASE[environment])
payload = json.dumps({
'data': {
'type': 'AccountLogin',
'attributes': {
'email': email,
'password': password,
}
}
})
headers = {
'Content-Type': "application/vnd.api+json",
# 'User-Agent': "PostmanRuntime/7.18.0",
'Accept': "*/*",
'Cache-Control': "no-cache",
# 'Postman-Token': "9362b587-4c53-4669-bc10-13a4e387cb96,f297ad61-ac93-4a70-b540-754ad6ef2040",
# 'Host': "api.staging.internal.capsulerx.com",
'Accept-Encoding': "gzip, deflate",
# 'Content-Length': "118",
'Connection': "close",
'cache-control': "no-cache"
}
response = requests.request("POST", url, data=payload, headers=headers)
return json.loads(response.text).get('data', {}).get('attributes', {}).get('token', -1)
def get_default_headers(token):
return {
'Authorization': "Bearer {token}".format(token=token),
# 'User-Agent': "PostmanRuntime/7.18.0",
'Accept': "*/*",
'Cache-Control': "no-cache",
# 'Postman-Token': "9f30584e-8c4c-41f5-a29b-cb0effded9ad,15424c48-ada2-429b-8281-c72baa17ab53",
# 'Host': "localhost:8000",
'Accept-Encoding': "gzip, deflate",
'Connection': "close",
'cache-control': "no-cache"
}
def get_size_latency(response):
# TODO Figure out a better way to measure size
return len(response.content), response.elapsed.microseconds / 1000
def patient_medication_list_v2(patient_id, environment, token):
url = '{url_base}/internal/patient_medication_list_item/v2'.format(url_base=URL_BASE[environment])
querystring = {"patient":str(patient_id)}
response = requests.request("GET", url, headers=get_default_headers(token), params=querystring)
return get_size_latency(response)
def patient_medication_list_v1_1(patient_id, environment, token):
url = '{url_base}/internal/patient_medication_list_item'.format(url_base=URL_BASE[environment])
querystring = {"include":"medication,active_rx,upcoming_fill","patient":str(patient_id),"has_prescriptions":"true","add_rx_excluded":"true","is_hidden":"false"}
response = requests.request("GET", url, headers=get_default_headers(token), params=querystring)
return get_size_latency(response)
def patient_medication_list_v1_2(patient_id, environment, token):
url = '{url_base}/internal/patient_medication_list_item'.format(url_base=URL_BASE[environment])
querystring = {"include":"medication","patient":str(patient_id),"has_prescriptions":"true","add_rx_excluded":"true","is_hidden":"true"}
response = requests.request("GET", url, headers=get_default_headers(token), params=querystring)
return get_size_latency(response)
def patient_medication_list_v1_3(patient_id, environment, token):
url = '{url_base}/internal/patient_medication_list_item'.format(url_base=URL_BASE[environment])
querystring = {"include":"medication","patient":str(patient_id),"has_prescriptions":"true","add_rx_only":"true","is_hidden":"false"}
response = requests.request("GET", url, headers=get_default_headers(token), params=querystring)
return get_size_latency(response)
import time
import random
def probe_endpoint(patient_id, environment, token, fn, tries=5):
total_size = total_latency = 0
for i in range(tries):
size, latency = fn(patient_id, environment, token)
total_size += size
total_latency += latency
time.sleep(random.random()) # wait at most 1 seconds before retrying
return total_size / tries, total_latency / tries
"""
staging med heavy patients from DB
select
p.id,
count(pm.id) as n_meds,
SUM(CASE WHEN pm.is_hidden = true THEN 1 ELSE 0 END) AS hidden,
SUM(CASE WHEN pm.is_hidden = false THEN 1 ELSE 0 END) AS not_hidden
from capsule_core_patientmedication pm
join capsule_core_patient p on p.id = pm.patient_id
group by p.id
order by n_meds desc
limit 100
"""
patient_ids = [
33808,
113042,
25314,
26865,
43060,
39758,
27411,
64748,
62555,
39100,
86833,
25941,
28482,
39708,
40979,
42615,
27318,
27579,
45196,
35366,
12145,
31996,
62454,
39094,
42146,
29919,
46100,
51211,
25388,
26762,
46369,
48935,
41983,
34088,
34315,
44185,
47538,
40616,
28911,
34310,
44654,
35903,
47193,
27296,
30941
]
endpoints = [
('patient_medication_list_v2', patient_medication_list_v2),
('patient_medication_list_v1_1', patient_medication_list_v1_1),
('patient_medication_list_v1_2', patient_medication_list_v1_2),
('patient_medication_list_v1_3', patient_medication_list_v1_3),
]
def run_test(environment, patient_ids, endpoints):
email, password = CREDENTIALS[environment]
token = get_token(environment, email, password)
results = {}
for patient_id in patient_ids:
results[patient_id] = {}
for endpoint_name, endpoint in endpoints:
print('testing {} with {}'.format(patient_id, endpoint_name))
results[patient_id][endpoint_name] = probe_endpoint(patient_id, environment, token, endpoint, tries=5)
print(' done:', results[patient_id][endpoint_name])
return results
print(run_test('staging', patient_ids, endpoints))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment