Created
November 26, 2019 21:30
-
-
Save zeryx/bccffe585ed677a0f32a57b97e1d997e to your computer and use it in GitHub Desktop.
An orchestration algorithm that asynronously waits for the outputs of 3 different algorithms in batch to finish before returning a result
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Algorithmia | |
from multiprocessing import Manager, Pool | |
# API calls will begin at the apply() method, with the request body passed as 'input' | |
# For more details, see algorithmia.com/developers/algorithm-development/languages | |
ALGO_1 = "algorithmiahq/DeepFashion/1.3.0" | |
ALGO_2 = "algorithmiahq/multistageclassifierpetsdemo/0.1.0" | |
ALGO_3 = "character_recognition/tesseract/0.3.0" | |
# This threadpool is shared between all 3 algorithm requests, this keeps the number of active children restricted and easily controlled | |
# If desired, we could split this up into a dedicated threadpool for each child algorithm | |
THREADPOOL_SIZE = 5 | |
client = Algorithmia.client() | |
def call_ALGO_1(image, errorQ): | |
try: | |
if errorQ.empty(): | |
input = {"image": image, "model": "small", "tags_only": True} | |
print("running algo 1 with: {} ...".format(image)) | |
result = client.algo(ALGO_1).pipe(input).result | |
print("finished algo 1 with: {}...".format(image)) | |
return {"image": image, "result": result} | |
else: | |
return None | |
except Exception as e: | |
errorQ.put(e) | |
def call_ALGO_2(image, errorQ): | |
try: | |
if errorQ.empty(): | |
input = image | |
print("running algo 2 with: {} ...".format(image)) | |
result = client.algo(ALGO_2).pipe(input).result | |
print("finished algo 2 with: {}...".format(image)) | |
return {"image": image, "result": result} | |
else: | |
return None | |
except Exception as e: | |
errorQ.put(e) | |
def call_ALGO_3(image, errorQ): | |
try: | |
if errorQ.empty(): | |
input = {"image": image, "language": "eng"} | |
print("running algo 3 with: ...".format(image)) | |
result = client.algo(ALGO_3).pipe(input).result | |
print("finished algo 3 with: {}...".format(image)) | |
return {"image": image, "result": result} | |
else: | |
return None | |
except Exception as e: | |
errorQ.put(e) | |
def apply(input): | |
if isinstance(input, list): | |
process_pool = Pool(THREADPOOL_SIZE) | |
manager = Manager() | |
errorQ = manager.Queue() | |
threadable_inputs = [(image, errorQ) for image in input] | |
algo1_c = process_pool.starmap_async(call_ALGO_1, threadable_inputs) | |
algo2_c = process_pool.starmap_async(call_ALGO_2, threadable_inputs) | |
algo3_c = process_pool.starmap_async(call_ALGO_3, threadable_inputs) | |
algo1_results = algo1_c.get() | |
algo2_results = algo2_c.get() | |
algo3_results = algo3_c.get() | |
if errorQ.empty(): | |
return {"algo_1": algo1_results, "algo_2": algo2_results, "algo_3": algo3_results} | |
else: | |
raise errorQ.get() | |
else: | |
raise Exception("Input must be a list of image urls") | |
if __name__ == "__main__": | |
input = ["https://i.imgur.com/izwpkqD.jpg", "https://i.imgur.com/6QR9IBi.jpg", "https://i.imgur.com/mgNPYt4.jpg", | |
"https://i.imgur.com/SLcWcTw.jpg", "https://i.imgur.com/69az77y.jpg", "https://i.imgur.com/wCb6ev1.png"] | |
print(apply(input)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment