Skip to content

Instantly share code, notes, and snippets.

@csiebler
Created February 15, 2022 08:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save csiebler/553e0cd89dce38ae88d9304bf1fa33c0 to your computer and use it in GitHub Desktop.
Save csiebler/553e0cd89dce38ae88d9304bf1fa33c0 to your computer and use it in GitHub Desktop.
Test Read API performance using different methologies
import requests
import io
import logging
import threading
import time
import concurrent.futures
import Levenshtein as lev
from datetime import datetime
date_time = datetime.now()
out = open(date_time.strftime("%Y-%m-%d-%H-%M-%S") + '.csv', "a")
out.write(f"method, date, image size, processing duration\n")
out_summary = open(date_time.strftime("%Y-%m-%d-%H-%M-%S") + '_summary.csv', "a")
out_summary.write(f"method, date, image size, total processing duration\n")
n = 10 # Number of documents per thread
threads = 10 # Number of parallel threads
# If you want to test different document resolutions, put them in here, use SAS URLs or setup Managed Identity
urls = {
'500': 'https://csperftesting.blob.core.windows.net/images/500.jpg.....',
'1000': 'https://csperftesting.blob.core.windows.net/images/1000.jpg.....',
'2000': 'https://csperftesting.blob.core.windows.net/images/2000.jpg....',
'3000': 'https://csperftesting.blob.core.windows.net/images/3000.jpg.....',
}
# Enter your resource details here, give one key per thread
endpoint_url = "https://westeurope.api.cognitive.microsoft.com/vision/v3.2/read/analyze?language=en&pages=1&readingOrder=natural"
keys = ["key1",
"key2",
"..",
"...",
"...",
"..",
"..",
"....",
"...",
"...."]
assert(len(keys) == threads), "Number of Read API keys must be equal to number of threads"
def stay_below_10_tps(start_time, end_time):
duration = end_time - start_time
if (duration < 0.1):
time.sleep(0.1-duration) # Ensure we stay under 10 TPS
def start_ocr(key, size):
headers = {
"Content-Type": "application/json",
"Ocp-Apim-Subscription-Key": key
}
json = {
"url": urls[size]
}
start_time = time.time()
result = requests.post(endpoint_url, json=json, headers=headers)
assert result.status_code == 202, result.text
stay_below_10_tps(start_time, time.time())
return result.headers['Operation-Location']
def wait_for_results(key, location_url):
headers = {
"Content-Type": "application/json",
"Ocp-Apim-Subscription-Key": key
}
status = "running"
while(status != "succeeded"):
start_time = time.time()
response = requests.get(location_url, headers=headers).json()
status = response['status']
stay_below_10_tps(start_time, time.time())
assert status == "succeeded", response
return response
def run_sequential_test(size):
key = keys[0]
start_time = time.time()
for i in range(n):
doc_start = time.time()
location_url = start_ocr(key, size)
response = wait_for_results(key, location_url)
out.write(f"Option 1 - Sequential, {datetime.now()}, {size}, {time.time() - doc_start}\n")
duration = time.time() - start_time
print(f"Sequential batch of {n} took {duration} seconds")
out_summary.write(f"Option 1 - Sequential, {datetime.now()}, {size}, {duration}\n")
def run_sequential_optimized_test(size):
key = keys[0]
start_time = time.time()
locations = []
for i in range(n):
doc_start = time.time()
location_url = start_ocr(key, size)
locations.append({'location_url': location_url,
'start_time': doc_start})
for l in locations:
response = wait_for_results(key, l['location_url'])
out.write(f"Option 2 - Optimized, {datetime.now()}, {size}, {time.time() - l['start_time']}\n")
duration = time.time() - start_time
print(f"Sequential optimized batch of {n} took {duration} seconds")
out_summary.write(f"Option 2 - Optimized, {datetime.now()}, {size}, {duration}\n")
def test_parallel(i):
key = keys[i%threads]
doc_start = time.time()
r_url = start_ocr(key, size)
response = wait_for_results(key, r_url)
out.write(f"Option 3 - Multithreading, {datetime.now()}, {size}, {time.time() - doc_start}\n")
def run_parallel_test(size):
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
results = executor.map(test_parallel, range(n))
executor.shutdown(wait=True)
duration = time.time() - start_time
print(f"Parallel batch of {n} took {duration} seconds")
out_summary.write(f"Option 3 - Multithreading, {datetime.now()}, {size}, {duration}\n")
for size in urls.keys():
run_sequential_test(size)
run_sequential_optimized_test(size)
run_parallel_test(size)
out.close()
out_summary.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment