Skip to content

Instantly share code, notes, and snippets.

@zeryx
Created December 3, 2019 22:52
basic algorithm, made in python 3.6
import tensorflow as tf
import Algorithmia
import os
import re
import numpy as np
TEMP_COLLECTION = 'data://.session/'
MODEL_FILE = "data://zeryx/InceptionNetDemo/classify_image_graph_def.pb"
CONVERSION_FILE = "data://zeryx/InceptionNetDemo/imagenet_synset_to_human_label_map.txt"
LABEL_FILE = "data://zeryx/InceptionNetDemo/imagenet_2012_challenge_label_map_proto.pbtxt"
# Model loading, handled in global state to ensure easy processing
def load_model(client):
path_to_labels = client.file(LABEL_FILE).getFile().name
path_to_model = client.file(MODEL_FILE).getFile().name
path_to_conversion = client.file(CONVERSION_FILE).getFile().name
detection_graph = tf.Graph()
with detection_graph.as_default():
graph_def = tf.GraphDef()
with tf.gfile.GFile(path_to_model, 'rb') as fid:
serialized_graph = fid.read()
graph_def.ParseFromString(serialized_graph)
tf.import_graph_def(graph_def, name='')
label_index = load_label_index(path_to_conversion, path_to_labels)
return detection_graph, label_index
def load_label_index(conversion_path, label_path):
with open(conversion_path) as f:
proto_as_ascii_lines = f.read().split('\n')[:-1]
uid_to_human = {}
p = re.compile(r'[n\d]*[ \S,]*')
for line in proto_as_ascii_lines:
parsed_items = p.findall(line)
uid = parsed_items[0]
human_string = parsed_items[2]
uid_to_human[uid] = human_string
node_id_to_uid = {}
proto_as_ascii = tf.gfile.GFile(label_path).readlines()
for line in proto_as_ascii:
if line.startswith(' target_class:'):
target_class = int(line.split(': ')[1])
if line.startswith(' target_class_string:'):
target_class_string = line.split(': ')[1]
node_id_to_uid[target_class] = target_class_string[1:-2]
# Loads the final mapping of integer node ID to human-readable string
node_id_to_name = {}
for key, val in node_id_to_uid.items():
if val not in uid_to_human:
tf.logging.fatal('Failed to locate: %s', val)
name = uid_to_human[val]
node_id_to_name[key] = name
return node_id_to_name
def id_to_string(index_file, node_id):
if node_id not in index_file:
return ''
return index_file[node_id]
import Algorithmia
import tensorflow as tf
import os
from .auxillary import load_model, id_to_string
import numpy as np
client = Algorithmia.client()
graph, label_index = load_model(client)
# API calls will begin at the apply() method, with the request body passed as 'input'
# For more details, see algorithmia.com/developers/algorithm-development/languages
def get_image(url):
"""Uses the Smart Image Downloader algorithm to format and download images from the web or other places."""
input = {'image': str(url)}
output_url = client.algo("util/SmartImageDownloader/0.2.x").pipe(input).result['savePath'][0]
temp_file = client.file(output_url).getFile().name
os.rename(temp_file, temp_file + '.' + output_url.split('.')[-1])
return temp_file + '.' + output_url.split('.')[-1]
def do_work(image):
"""Does some computer vision work and needs a numpy array to function"""
image_data = tf.gfile.FastGFile(image, 'rb').read()
with tf.Session(graph=graph) as sess:
softmax_tensor = sess.graph.get_tensor_by_name('softmax:0')
predictions = sess.run(softmax_tensor,
{'DecodeJpeg/contents:0': image_data})
predictions = np.squeeze(predictions)
tags = []
top_k = predictions.argsort()[-5:][::-1]
for node_id in top_k:
human_string = id_to_string(label_index, node_id)
score = predictions[node_id]
result = {}
result['class'] = human_string
result['confidence'] = score.item()
tags.append(result)
results = {}
results['tags'] = tags
return results
def apply(input):
image_data = get_image(input)
results = do_work(image_data)
return results
```
This is works perfectly for situations where you just want to get image support to an algorithm that needs to do serial processing and then do some monolithic processing using a gpu or other resources to get a final result, one image at a time.
However, if you're dealing with a production system, you're most likely going to be using more than 1 downstream algorithm; and you'll also want to improve performance as much as possible, especially in batch. In that case, you're in luck! This tutorial will go over the different mechanisms you can use both inside of your algorithm, and outside to get the results you're looking for.
# Bandwidth Bottlenecks
## Async and Futures
Your algorithm above works great, but now you need to improve performance and enable batch processing. Your algorithm executes code very quickly (Which is great!) but the http reqeuests to actually download images take a variable amount of time, and are eating up a huge chunk of the total compute time.
This is when using an `async` function may be of value (and Futures), this structure allows you to make a series of requests in parallel, and just wait for them all to finish; regardless of which image downloaded first or second.
[example algorithm here](https://algorithmia.com/algorithms/zeryx/async_image_algorithm)
```
import Algorithmia
from .auxillary import load_model, id_to_string
import tensorflow as tf
import asyncio
import os, re
import numpy as np
# Global variables here
client = Algorithmia.client()
graph, label_index = load_model(client)
def get_image(url):
"""Uses the Smart Image Downloader algorithm to format and download images from the web or other places."""
input = {'image': str(url)}
output_url = client.algo("util/SmartImageDownloader/0.2.x").pipe(input).result['savePath'][0]
temp_file = client.file(output_url).getFile().name
os.rename(temp_file, temp_file + '.' + output_url.split('.')[-1])
return temp_file + '.' + output_url.split('.')[-1]
def do_work(image):
"""Does some computer vision work and needs a numpy array to function"""
image_data = tf.gfile.FastGFile(image, 'rb').read()
with tf.Session(graph=graph) as sess:
softmax_tensor = sess.graph.get_tensor_by_name('softmax:0')
predictions = sess.run(softmax_tensor,
{'DecodeJpeg/contents:0': image_data})
predictions = np.squeeze(predictions)
tags = []
top_k = predictions.argsort()[-5:][::-1]
for node_id in top_k:
human_string = id_to_string(label_index, node_id)
score = predictions[node_id]
result = {}
result['class'] = human_string
result['confidence'] = score.item()
tags.append(result)
results = {}
results['tags'] = tags
return results
# We've added a processor function that gets and processes an image, but is prefixed with an 'async'
# We did this, as when dealing with batch for image processing algorithms, it's common that bottleneck is http and getting
# the images from a remote resource into your system.
# You can read more about 'asyncio' here: https://docs.python.org/3/library/asyncio.html
# Bare in mind that if you're using a version of python < 3.5, you'll need to import it as a pypi package.
async def process_url(url):
image_data = get_image(url)
result = do_work(image_data)
return result
def apply(input):
loop = asyncio.get_event_loop()
# We have a list of inputs that we're going to want to loop over
if isinstance(input, list):
future_images = []
for url in input:
async_image = asyncio.ensure_future(process_url(url))
future_images.append(async_image)
# Now we have a list of promises, let's loop through them until there's nothing left
results = loop.run_until_complete(asyncio.gather(*future_images))
return results
elif isinstance(input, str):
# And if we are only processing one image at a time, lets keep the old functionality as well
image_data = get_image(input)
result = do_work(image_data)
return result
else:
raise Exception("Invalid input, expecting a list of Urls or a single URL string.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment