Skip to content

Instantly share code, notes, and snippets.

@zeryx
Created November 26, 2019 21:30
Show Gist options
  • Save zeryx/bccffe585ed677a0f32a57b97e1d997e to your computer and use it in GitHub Desktop.
Save zeryx/bccffe585ed677a0f32a57b97e1d997e to your computer and use it in GitHub Desktop.
An orchestration algorithm that asynronously waits for the outputs of 3 different algorithms in batch to finish before returning a result
import Algorithmia
from multiprocessing import Manager, Pool
# API calls will begin at the apply() method, with the request body passed as 'input'
# For more details, see algorithmia.com/developers/algorithm-development/languages
ALGO_1 = "algorithmiahq/DeepFashion/1.3.0"
ALGO_2 = "algorithmiahq/multistageclassifierpetsdemo/0.1.0"
ALGO_3 = "character_recognition/tesseract/0.3.0"
# This threadpool is shared between all 3 algorithm requests, this keeps the number of active children restricted and easily controlled
# If desired, we could split this up into a dedicated threadpool for each child algorithm
THREADPOOL_SIZE = 5
client = Algorithmia.client()
def call_ALGO_1(image, errorQ):
try:
if errorQ.empty():
input = {"image": image, "model": "small", "tags_only": True}
print("running algo 1 with: {} ...".format(image))
result = client.algo(ALGO_1).pipe(input).result
print("finished algo 1 with: {}...".format(image))
return {"image": image, "result": result}
else:
return None
except Exception as e:
errorQ.put(e)
def call_ALGO_2(image, errorQ):
try:
if errorQ.empty():
input = image
print("running algo 2 with: {} ...".format(image))
result = client.algo(ALGO_2).pipe(input).result
print("finished algo 2 with: {}...".format(image))
return {"image": image, "result": result}
else:
return None
except Exception as e:
errorQ.put(e)
def call_ALGO_3(image, errorQ):
try:
if errorQ.empty():
input = {"image": image, "language": "eng"}
print("running algo 3 with: ...".format(image))
result = client.algo(ALGO_3).pipe(input).result
print("finished algo 3 with: {}...".format(image))
return {"image": image, "result": result}
else:
return None
except Exception as e:
errorQ.put(e)
def apply(input):
if isinstance(input, list):
process_pool = Pool(THREADPOOL_SIZE)
manager = Manager()
errorQ = manager.Queue()
threadable_inputs = [(image, errorQ) for image in input]
algo1_c = process_pool.starmap_async(call_ALGO_1, threadable_inputs)
algo2_c = process_pool.starmap_async(call_ALGO_2, threadable_inputs)
algo3_c = process_pool.starmap_async(call_ALGO_3, threadable_inputs)
algo1_results = algo1_c.get()
algo2_results = algo2_c.get()
algo3_results = algo3_c.get()
if errorQ.empty():
return {"algo_1": algo1_results, "algo_2": algo2_results, "algo_3": algo3_results}
else:
raise errorQ.get()
else:
raise Exception("Input must be a list of image urls")
if __name__ == "__main__":
input = ["https://i.imgur.com/izwpkqD.jpg", "https://i.imgur.com/6QR9IBi.jpg", "https://i.imgur.com/mgNPYt4.jpg",
"https://i.imgur.com/SLcWcTw.jpg", "https://i.imgur.com/69az77y.jpg", "https://i.imgur.com/wCb6ev1.png"]
print(apply(input))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment