Created
March 16, 2020 22:03
-
-
Save weltmeyer/22ede027161ec13baf0671151399e726 to your computer and use it in GitHub Desktop.
coral-app from https://github.com/robmarkcole/coral-pi-rest-server as yolo version
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Start the server: | |
# python3 coral-app.py | |
# Submit a request via cURL: | |
# curl -X POST -F image=@face.jpg 'http://localhost:5000/v1/vision/detection' | |
from edgetpu.detection.engine import DetectionEngine | |
import tensorflow as tf | |
import argparse | |
from PIL import Image | |
import flask | |
import logging | |
import io | |
import numpy as np | |
import os | |
import cv2 | |
from time import time | |
EDGETPU_SHARED_LIB = "libedgetpu.so.1" | |
app = flask.Flask(__name__) | |
LOGFORMAT = "%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s" | |
logging.basicConfig(filename='coral.log', level=logging.DEBUG, format=LOGFORMAT) | |
engine = None | |
labels = None | |
anchors = None | |
interpreter = None | |
ROOT_URL = "/v1/vision/detection" | |
# Function to read labels from text files. | |
def ReadLabelFile(file_path): | |
with open(file_path) as f: | |
ret = [line.strip('\n') for line in f.readlines()] | |
return ret | |
# Function to read labels from text files. | |
def ReadAnchorFile(file_path): | |
with open(file_path) as f: | |
ret = f.readline() | |
ret = [float(x) for x in ret.split(',')] | |
return np.array(ret).reshape(-1, 2) | |
@app.route("/") | |
def info(): | |
info_str = "Flask app exposing tensorflow lite model {}".format(MODEL) | |
return info_str | |
@app.route(ROOT_URL, methods=["POST"]) | |
def predict(): | |
data = {"success": False} | |
if flask.request.method == "POST": | |
if flask.request.files.get("image"): | |
image_file = flask.request.files["image"] | |
image_bytes = image_file.read() | |
image = Image.open(io.BytesIO(image_bytes)) | |
open_cv_image = np.array(image) | |
# Convert RGB to BGR | |
open_cv_image = open_cv_image[:, :, ::-1].copy() | |
boxes, scores, pred_classes = inference(interpreter, open_cv_image, anchors, len(labels), 0.2) | |
data["success"] = True | |
preds = [] | |
i = 0 | |
for topleft, botright in boxes: | |
# Detected class | |
cl = int(pred_classes[i]) | |
score = scores[i]# * 100 | |
print("{}: {}".format(labels[cl],score)) | |
preds.append( | |
{ | |
"confidence": float(score), | |
"label": labels[cl], | |
"y_min": int(topleft[1]), | |
"x_min": int(topleft[0]), | |
"y_max": int(botright[1]), | |
"x_max": int(botright[0]), | |
}) | |
i += 1 | |
data["predictions"] = preds | |
# return the data dictionary as a JSON response | |
return flask.jsonify(data) | |
def make_interpreter(model_path): | |
interpreter = tf.lite.Interpreter(model_path=model_path, | |
experimental_delegates=[ | |
tf.lite.experimental.load_delegate(EDGETPU_SHARED_LIB) | |
]) | |
return interpreter | |
def get_interpreter_details(interpreter): | |
# Get input and output tensor details | |
input_details = interpreter.get_input_details() | |
output_details = interpreter.get_output_details() | |
input_shape = input_details[0]["shape"] | |
return input_details, output_details, input_shape | |
# Run YOLO inference on the image, returns detected boxes | |
def inference(interpreter, img, anchors, n_classes, threshold): | |
input_details, output_details, net_input_shape = \ | |
get_interpreter_details(interpreter) | |
img_orig_shape = img.shape | |
# Crop frame to network input shape | |
img = letterbox_image(img.copy(), (416, 416)) | |
# Add batch dimension | |
img = np.expand_dims(img, 0) | |
#if not args.quant: | |
# Normalize image from 0 to 1 | |
# img = np.divide(img, 255.).astype(np.float32) | |
# Set input tensor | |
print(interpreter.get_input_details()) | |
interpreter.set_tensor(input_details[0]['index'], img) | |
start = time() | |
# Run model | |
interpreter.invoke() | |
inf_time = time() - start | |
print(f"Net forward-pass time: {inf_time*1000} ms.") | |
# Retrieve outputs of the network | |
out1 = interpreter.get_tensor(output_details[0]['index']) | |
out2 = interpreter.get_tensor(output_details[1]['index']) | |
# If this is a quantized model, dequantize the outputs | |
if True:#args.quant: | |
# Dequantize output | |
o1_scale, o1_zero = output_details[0]['quantization'] | |
out1 = (out1.astype(np.float32) - o1_zero) * o1_scale | |
o2_scale, o2_zero = output_details[1]['quantization'] | |
out2 = (out2.astype(np.float32) - o2_zero) * o2_scale | |
# Get boxes from outputs of network | |
start = time() | |
_boxes1, _scores1, _classes1 = featuresToBoxes(out1, anchors[[3, 4, 5]], | |
n_classes, net_input_shape, img_orig_shape, threshold) | |
_boxes2, _scores2, _classes2 = featuresToBoxes(out2, anchors[[1, 2, 3]], | |
n_classes, net_input_shape, img_orig_shape, threshold) | |
inf_time = time() - start | |
print(f"Box computation time: {inf_time*1000} ms.") | |
# This is needed to be able to append nicely when the output layers don't | |
# return any boxes | |
if _boxes1.shape[0] == 0: | |
_boxes1 = np.empty([0, 2, 2]) | |
_scores1 = np.empty([0,]) | |
_classes1 = np.empty([0,]) | |
if _boxes2.shape[0] == 0: | |
_boxes2 = np.empty([0, 2, 2]) | |
_scores2 = np.empty([0,]) | |
_classes2 = np.empty([0,]) | |
boxes = np.append(_boxes1, _boxes2, axis=0) | |
scores = np.append(_scores1, _scores2, axis=0) | |
classes = np.append(_classes1, _classes2, axis=0) | |
#if len(boxes) > 0: | |
# boxes, scores, classes = nms_boxes(boxes, scores, classes) | |
return boxes, scores, classes | |
def sigmoid(x): | |
return 1. / (1 + np.exp(-x)) | |
def letterbox_image(image, size): | |
'''resize image with unchanged aspect ratio using padding''' | |
iw, ih = image.shape[0:2][::-1] | |
w, h = size | |
scale = min(w/iw, h/ih) | |
nw = int(iw*scale) | |
nh = int(ih*scale) | |
image = cv2.resize(image, (nw,nh), interpolation=cv2.INTER_CUBIC) | |
new_image = np.zeros((size[1], size[0], 3), np.uint8) | |
new_image.fill(128) | |
dx = (w-nw)//2 | |
dy = (h-nh)//2 | |
new_image[dy:dy+nh, dx:dx+nw,:] = image | |
return new_image | |
def featuresToBoxes(outputs, anchors, n_classes, net_input_shape, | |
img_orig_shape, threshold): | |
grid_shape = outputs.shape[1:3] | |
n_anchors = len(anchors) | |
# Numpy screwaround to get the boxes in reasonable amount of time | |
grid_y = np.tile(np.arange(grid_shape[0]).reshape(-1, 1), grid_shape[0]).reshape(1, grid_shape[0], grid_shape[0], 1).astype(np.float32) | |
grid_x = grid_y.copy().T.reshape(1, grid_shape[0], grid_shape[1], 1).astype(np.float32) | |
outputs = outputs.reshape(1, grid_shape[0], grid_shape[1], n_anchors, -1) | |
_anchors = anchors.reshape(1, 1, 3, 2).astype(np.float32) | |
# Get box parameters from network output and apply transformations | |
bx = (sigmoid(outputs[..., 0]) + grid_x) / grid_shape[0] | |
by = (sigmoid(outputs[..., 1]) + grid_y) / grid_shape[1] | |
# Should these be inverted? | |
bw = np.multiply(_anchors[..., 0] / net_input_shape[1], np.exp(outputs[..., 2])) | |
bh = np.multiply(_anchors[..., 1] / net_input_shape[2], np.exp(outputs[..., 3])) | |
# Get the scores | |
scores = sigmoid(np.expand_dims(outputs[..., 4], -1)) * \ | |
sigmoid(outputs[..., 5:]) | |
scores = scores.reshape(-1, n_classes) | |
# Reshape boxes and scale back to original image size | |
ratio = net_input_shape[2] / img_orig_shape[1] | |
letterboxed_height = ratio * img_orig_shape[0] | |
scale = net_input_shape[1] / letterboxed_height | |
offset = (net_input_shape[1] - letterboxed_height) / 2 / net_input_shape[1] | |
bx = bx.flatten() | |
by = (by.flatten() - offset) * scale | |
bw = bw.flatten() | |
bh = bh.flatten() * scale | |
half_bw = bw / 2. | |
half_bh = bh / 2. | |
tl_x = np.multiply(bx - half_bw, img_orig_shape[1]) | |
tl_y = np.multiply(by - half_bh, img_orig_shape[0]) | |
br_x = np.multiply(bx + half_bw, img_orig_shape[1]) | |
br_y = np.multiply(by + half_bh, img_orig_shape[0]) | |
# Get indices of boxes with score higher than threshold | |
indices = np.argwhere(scores >= threshold) | |
selected_boxes = [] | |
selected_scores = [] | |
for i in indices: | |
i = tuple(i) | |
selected_boxes.append( ((tl_x[i[0]], tl_y[i[0]]), (br_x[i[0]], br_y[i[0]])) ) | |
selected_scores.append(scores[i]) | |
selected_boxes = np.array(selected_boxes) | |
selected_scores = np.array(selected_scores) | |
selected_classes = indices[:, 1] | |
return selected_boxes, selected_scores, selected_classes | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Flask app exposing coral USB stick for YOLO") | |
parser.add_argument( | |
"--models_directory", | |
default="./yolomodels/", | |
help="the directory containing the model & labels files", | |
) | |
parser.add_argument( | |
"--model", | |
default="quant_coco-tiny-v3-relu_edgetpu.tflite", | |
help="model file", | |
) | |
parser.add_argument( | |
"--labels", default="coco.names", help="labels file of model" | |
) | |
parser.add_argument( | |
"--anchors", default="tiny_yolo_anchors.txt", help="yolo anchors file of model" | |
) | |
parser.add_argument("--port", default=5000, type=int, help="port number") | |
args = parser.parse_args() | |
global MODEL | |
MODEL = args.model | |
model_file = args.models_directory + args.model | |
labels_file = args.models_directory + args.labels | |
anchors_file = args.models_directory + args.anchors | |
labels = ReadLabelFile(labels_file) | |
anchors = ReadAnchorFile(anchors_file) | |
print ("\n Anchors:{}".format(anchors)) | |
print ("\n Load Interpreter...") | |
interpreter = make_interpreter(model_file) | |
print ("\n Loaded Interpreter: {}".format(interpreter.get_input_details())) | |
print("\n Allocating tensors...") | |
interpreter.allocate_tensors() | |
print(".done.") | |
#engine = DetectionEngine(model_file) | |
#print("\n Loaded engine with model : {}".format(model_file)) | |
app.run(host="0.0.0.0", port=args.port,debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment