Skip to content

Instantly share code, notes, and snippets.

@weltmeyer
Created March 16, 2020 22:03
Show Gist options
  • Save weltmeyer/22ede027161ec13baf0671151399e726 to your computer and use it in GitHub Desktop.
Save weltmeyer/22ede027161ec13baf0671151399e726 to your computer and use it in GitHub Desktop.
# Start the server:
# python3 coral-app.py
# Submit a request via cURL:
# curl -X POST -F image=@face.jpg 'http://localhost:5000/v1/vision/detection'
from edgetpu.detection.engine import DetectionEngine
import tensorflow as tf
import argparse
from PIL import Image
import flask
import logging
import io
import numpy as np
import os
import cv2
from time import time
EDGETPU_SHARED_LIB = "libedgetpu.so.1"
app = flask.Flask(__name__)
LOGFORMAT = "%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s"
logging.basicConfig(filename='coral.log', level=logging.DEBUG, format=LOGFORMAT)
engine = None
labels = None
anchors = None
interpreter = None
ROOT_URL = "/v1/vision/detection"
# Function to read labels from text files.
def ReadLabelFile(file_path):
with open(file_path) as f:
ret = [line.strip('\n') for line in f.readlines()]
return ret
# Function to read labels from text files.
def ReadAnchorFile(file_path):
with open(file_path) as f:
ret = f.readline()
ret = [float(x) for x in ret.split(',')]
return np.array(ret).reshape(-1, 2)
@app.route("/")
def info():
info_str = "Flask app exposing tensorflow lite model {}".format(MODEL)
return info_str
@app.route(ROOT_URL, methods=["POST"])
def predict():
data = {"success": False}
if flask.request.method == "POST":
if flask.request.files.get("image"):
image_file = flask.request.files["image"]
image_bytes = image_file.read()
image = Image.open(io.BytesIO(image_bytes))
open_cv_image = np.array(image)
# Convert RGB to BGR
open_cv_image = open_cv_image[:, :, ::-1].copy()
boxes, scores, pred_classes = inference(interpreter, open_cv_image, anchors, len(labels), 0.2)
data["success"] = True
preds = []
i = 0
for topleft, botright in boxes:
# Detected class
cl = int(pred_classes[i])
score = scores[i]# * 100
print("{}: {}".format(labels[cl],score))
preds.append(
{
"confidence": float(score),
"label": labels[cl],
"y_min": int(topleft[1]),
"x_min": int(topleft[0]),
"y_max": int(botright[1]),
"x_max": int(botright[0]),
})
i += 1
data["predictions"] = preds
# return the data dictionary as a JSON response
return flask.jsonify(data)
def make_interpreter(model_path):
interpreter = tf.lite.Interpreter(model_path=model_path,
experimental_delegates=[
tf.lite.experimental.load_delegate(EDGETPU_SHARED_LIB)
])
return interpreter
def get_interpreter_details(interpreter):
# Get input and output tensor details
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
input_shape = input_details[0]["shape"]
return input_details, output_details, input_shape
# Run YOLO inference on the image, returns detected boxes
def inference(interpreter, img, anchors, n_classes, threshold):
input_details, output_details, net_input_shape = \
get_interpreter_details(interpreter)
img_orig_shape = img.shape
# Crop frame to network input shape
img = letterbox_image(img.copy(), (416, 416))
# Add batch dimension
img = np.expand_dims(img, 0)
#if not args.quant:
# Normalize image from 0 to 1
# img = np.divide(img, 255.).astype(np.float32)
# Set input tensor
print(interpreter.get_input_details())
interpreter.set_tensor(input_details[0]['index'], img)
start = time()
# Run model
interpreter.invoke()
inf_time = time() - start
print(f"Net forward-pass time: {inf_time*1000} ms.")
# Retrieve outputs of the network
out1 = interpreter.get_tensor(output_details[0]['index'])
out2 = interpreter.get_tensor(output_details[1]['index'])
# If this is a quantized model, dequantize the outputs
if True:#args.quant:
# Dequantize output
o1_scale, o1_zero = output_details[0]['quantization']
out1 = (out1.astype(np.float32) - o1_zero) * o1_scale
o2_scale, o2_zero = output_details[1]['quantization']
out2 = (out2.astype(np.float32) - o2_zero) * o2_scale
# Get boxes from outputs of network
start = time()
_boxes1, _scores1, _classes1 = featuresToBoxes(out1, anchors[[3, 4, 5]],
n_classes, net_input_shape, img_orig_shape, threshold)
_boxes2, _scores2, _classes2 = featuresToBoxes(out2, anchors[[1, 2, 3]],
n_classes, net_input_shape, img_orig_shape, threshold)
inf_time = time() - start
print(f"Box computation time: {inf_time*1000} ms.")
# This is needed to be able to append nicely when the output layers don't
# return any boxes
if _boxes1.shape[0] == 0:
_boxes1 = np.empty([0, 2, 2])
_scores1 = np.empty([0,])
_classes1 = np.empty([0,])
if _boxes2.shape[0] == 0:
_boxes2 = np.empty([0, 2, 2])
_scores2 = np.empty([0,])
_classes2 = np.empty([0,])
boxes = np.append(_boxes1, _boxes2, axis=0)
scores = np.append(_scores1, _scores2, axis=0)
classes = np.append(_classes1, _classes2, axis=0)
#if len(boxes) > 0:
# boxes, scores, classes = nms_boxes(boxes, scores, classes)
return boxes, scores, classes
def sigmoid(x):
return 1. / (1 + np.exp(-x))
def letterbox_image(image, size):
'''resize image with unchanged aspect ratio using padding'''
iw, ih = image.shape[0:2][::-1]
w, h = size
scale = min(w/iw, h/ih)
nw = int(iw*scale)
nh = int(ih*scale)
image = cv2.resize(image, (nw,nh), interpolation=cv2.INTER_CUBIC)
new_image = np.zeros((size[1], size[0], 3), np.uint8)
new_image.fill(128)
dx = (w-nw)//2
dy = (h-nh)//2
new_image[dy:dy+nh, dx:dx+nw,:] = image
return new_image
def featuresToBoxes(outputs, anchors, n_classes, net_input_shape,
img_orig_shape, threshold):
grid_shape = outputs.shape[1:3]
n_anchors = len(anchors)
# Numpy screwaround to get the boxes in reasonable amount of time
grid_y = np.tile(np.arange(grid_shape[0]).reshape(-1, 1), grid_shape[0]).reshape(1, grid_shape[0], grid_shape[0], 1).astype(np.float32)
grid_x = grid_y.copy().T.reshape(1, grid_shape[0], grid_shape[1], 1).astype(np.float32)
outputs = outputs.reshape(1, grid_shape[0], grid_shape[1], n_anchors, -1)
_anchors = anchors.reshape(1, 1, 3, 2).astype(np.float32)
# Get box parameters from network output and apply transformations
bx = (sigmoid(outputs[..., 0]) + grid_x) / grid_shape[0]
by = (sigmoid(outputs[..., 1]) + grid_y) / grid_shape[1]
# Should these be inverted?
bw = np.multiply(_anchors[..., 0] / net_input_shape[1], np.exp(outputs[..., 2]))
bh = np.multiply(_anchors[..., 1] / net_input_shape[2], np.exp(outputs[..., 3]))
# Get the scores
scores = sigmoid(np.expand_dims(outputs[..., 4], -1)) * \
sigmoid(outputs[..., 5:])
scores = scores.reshape(-1, n_classes)
# Reshape boxes and scale back to original image size
ratio = net_input_shape[2] / img_orig_shape[1]
letterboxed_height = ratio * img_orig_shape[0]
scale = net_input_shape[1] / letterboxed_height
offset = (net_input_shape[1] - letterboxed_height) / 2 / net_input_shape[1]
bx = bx.flatten()
by = (by.flatten() - offset) * scale
bw = bw.flatten()
bh = bh.flatten() * scale
half_bw = bw / 2.
half_bh = bh / 2.
tl_x = np.multiply(bx - half_bw, img_orig_shape[1])
tl_y = np.multiply(by - half_bh, img_orig_shape[0])
br_x = np.multiply(bx + half_bw, img_orig_shape[1])
br_y = np.multiply(by + half_bh, img_orig_shape[0])
# Get indices of boxes with score higher than threshold
indices = np.argwhere(scores >= threshold)
selected_boxes = []
selected_scores = []
for i in indices:
i = tuple(i)
selected_boxes.append( ((tl_x[i[0]], tl_y[i[0]]), (br_x[i[0]], br_y[i[0]])) )
selected_scores.append(scores[i])
selected_boxes = np.array(selected_boxes)
selected_scores = np.array(selected_scores)
selected_classes = indices[:, 1]
return selected_boxes, selected_scores, selected_classes
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Flask app exposing coral USB stick for YOLO")
parser.add_argument(
"--models_directory",
default="./yolomodels/",
help="the directory containing the model & labels files",
)
parser.add_argument(
"--model",
default="quant_coco-tiny-v3-relu_edgetpu.tflite",
help="model file",
)
parser.add_argument(
"--labels", default="coco.names", help="labels file of model"
)
parser.add_argument(
"--anchors", default="tiny_yolo_anchors.txt", help="yolo anchors file of model"
)
parser.add_argument("--port", default=5000, type=int, help="port number")
args = parser.parse_args()
global MODEL
MODEL = args.model
model_file = args.models_directory + args.model
labels_file = args.models_directory + args.labels
anchors_file = args.models_directory + args.anchors
labels = ReadLabelFile(labels_file)
anchors = ReadAnchorFile(anchors_file)
print ("\n Anchors:{}".format(anchors))
print ("\n Load Interpreter...")
interpreter = make_interpreter(model_file)
print ("\n Loaded Interpreter: {}".format(interpreter.get_input_details()))
print("\n Allocating tensors...")
interpreter.allocate_tensors()
print(".done.")
#engine = DetectionEngine(model_file)
#print("\n Loaded engine with model : {}".format(model_file))
app.run(host="0.0.0.0", port=args.port,debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment