Skip to content

Instantly share code, notes, and snippets.

@jiangzhongbo
Forked from ogl4jo3/blaze_face_detect.py
Created April 27, 2020 11:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jiangzhongbo/98da709c47e9458257f0471775f8f963 to your computer and use it in GitHub Desktop.
Save jiangzhongbo/98da709c47e9458257f0471775f8f963 to your computer and use it in GitHub Desktop.
Demo BlazeFace model.
import cv2
import time
import math
import numpy as np
import tensorflow as tf
class SsdAnchorsCalculatorOptions:
def __init__(self, input_size_width, input_size_height, min_scale, max_scale
, num_layers, feature_map_width, feature_map_height
, strides, aspect_ratios, anchor_offset_x=0.5, anchor_offset_y=0.5
, reduce_boxes_in_lowest_layer=False, interpolated_scale_aspect_ratio=1.0
, fixed_anchor_size=False):
# Size of input images.
self.input_size_width = input_size_width
self.input_size_height = input_size_height
# Min and max scales for generating anchor boxes on feature maps.
self.min_scale = min_scale
self.max_scale = max_scale
# The offset for the center of anchors. The value is in the scale of stride.
# E.g. 0.5 meaning 0.5 * |current_stride| in pixels.
self.anchor_offset_x = anchor_offset_x
self.anchor_offset_y = anchor_offset_y
# Number of output feature maps to generate the anchors on.
self.num_layers = num_layers
# Sizes of output feature maps to create anchors. Either feature_map size or
# stride should be provided.
self.feature_map_width = feature_map_width
self.feature_map_height = feature_map_height
self.feature_map_width_size = len(feature_map_width)
self.feature_map_height_size = len(feature_map_height)
# Strides of each output feature maps.
self.strides = strides
self.strides_size = len(strides)
# List of different aspect ratio to generate anchors.
self.aspect_ratios = aspect_ratios
self.aspect_ratios_size = len(aspect_ratios)
# A boolean to indicate whether the fixed 3 boxes per location is used in the lowest layer.
self.reduce_boxes_in_lowest_layer = reduce_boxes_in_lowest_layer
# An additional anchor is added with this aspect ratio and a scale
# interpolated between the scale for a layer and the scale for the next layer
# (1.0 for the last layer). This anchor is not included if this value is 0.
self.interpolated_scale_aspect_ratio = interpolated_scale_aspect_ratio
# Whether use fixed width and height (e.g. both 1.0f) for each anchor.
# This option can be used when the predicted anchor width and height are in pixels.
self.fixed_anchor_size = fixed_anchor_size
def to_string(self):
return 'input_size_width: {:}\ninput_size_height: {:}\nmin_scale: {:}\nmax_scale: {:}\nanchor_offset_x: {:}\nanchor_offset_y: {:}\nnum_layers: {:}\nfeature_map_width: {:}\nfeature_map_height: {:}\nstrides: {:}\naspect_ratios: {:}\nreduce_boxes_in_lowest_layer: {:}\ninterpolated_scale_aspect_ratio: {:}\nfixed_anchor_size: {:}'\
.format(self.input_size_width, self.input_size_height, self.min_scale, self.max_scale
, self.anchor_offset_x, self.anchor_offset_y, self.num_layers
, self.feature_map_width, self.feature_map_height, self.strides, self.aspect_ratios
, self.reduce_boxes_in_lowest_layer, self.interpolated_scale_aspect_ratio
, self.fixed_anchor_size)
class Anchor:
def __init__(self, x_center, y_center, h, w):
self.x_center = x_center
self.y_center = y_center
self.h = h
self.w = w
def to_string(self):
return 'x_center: {:}, y_center: {:}, h: {:}, w: {:}'.format(self.x_center, self.y_center, self.h, self.w)
class Detection:
def __init__(self, score, class_id, xmin, ymin, width, height):
self.score = score
self.class_id = class_id
self.xmin = xmin
self.ymin = ymin
self.width = width
self.height = height
def to_string(self):
return 'score: {:}, class_id: {:}, xmin: {:}, ymin: {:}, width: {:}, height: {:}'.format(self.score, self.class_id, self.xmin, self.ymin, self.width, self.height)
class TfLiteTensorsToDetectionsCalculatorOptions:
def __init__(self, num_classes, num_boxes, num_coords, keypoint_coord_offset
, ignore_classes, score_clipping_thresh, min_score_thresh
, num_keypoints=0, num_values_per_keypoint=2, box_coord_offset=0
, x_scale=0.0, y_scale=0.0, w_scale=0.0, h_scale=0.0, apply_exponential_on_box_size=False
, reverse_output_order=False, sigmoid_score=False, flip_vertically=False):
# The number of output classes predicted by the detection model.
self.num_classes = num_classes
# The number of output boxes predicted by the detection model.
self.num_boxes = num_boxes
# The number of output values per boxes predicted by the detection model. The
# values contain bounding boxes, keypoints, etc.
self.num_coords = num_coords
# The offset of keypoint coordinates in the location tensor.
self.keypoint_coord_offset = keypoint_coord_offset
# The number of predicted keypoints.
self.num_keypoints = num_keypoints
# The dimension of each keypoint, e.g. number of values predicted for each keypoint.
self.num_values_per_keypoint = num_values_per_keypoint
# The offset of box coordinates in the location tensor.
self.box_coord_offset = box_coord_offset
# Parameters for decoding SSD detection model.
self.x_scale = x_scale
self.y_scale = y_scale
self.w_scale = w_scale
self.h_scale = h_scale
self.apply_exponential_on_box_size = apply_exponential_on_box_size
# Whether to reverse the order of predicted x, y from output.
# If false, the order is [y_center, x_center, h, w], if true the order is
# [x_center, y_center, w, h].
self.reverse_output_order = reverse_output_order
# The ids of classes that should be ignored during decoding the score for
# each predicted box.
self.ignore_classes = ignore_classes
self.sigmoid_score = sigmoid_score
self.score_clipping_thresh = score_clipping_thresh
# Whether the detection coordinates from the input tensors should be flipped
# vertically (along the y-direction). This is useful, for example, when the
# input tensors represent detections defined with a coordinate system where
# the origin is at the top-left corner, whereas the desired detection
# representation has a bottom-left origin (e.g., in OpenGL).
self.flip_vertically = flip_vertically
# Score threshold for perserving decoded detections.
self.min_score_thresh = min_score_thresh
def to_string(self):
return 'num_classes: {:}\nnum_boxes: {:}\nnum_coords: {:}\nkeypoint_coord_offset: {:}\nnum_keypoints: {:}\nnum_values_per_keypoint: {:}\nbox_coord_offset: {:}\nx_scale: {:}\ny_scale: {:}\nwx_scale: {:}\nh_scale: {:}\napply_exponential_on_box_size: {:}\nreverse_output_order: {:}\nignore_classes: {:}\nsigmoid_score: {:}\nscore_clipping_thresh: {:}\nflip_vertically: {:}\nmin_score_thresh: {:}'\
.format(self.num_classes, self.num_boxes, self.num_coords, self.keypoint_coord_offset
, self.num_keypoints, self.num_values_per_keypoint, self.box_coord_offset
, self.x_scale, self.y_scale, self.w_scale, self.h_scale
, self.apply_exponential_on_box_size, self.reverse_output_order
, self.ignore_classes, self.sigmoid_score, self.score_clipping_thresh
, self.flip_vertically, self.min_score_thresh)
def DecodeBoxes(raw_boxes, anchors, options):
boxes = np.zeros(options.num_boxes * options.num_coords)
for i in range(options.num_boxes):
box_offset = i * options.num_coords + options.box_coord_offset
y_center = raw_boxes[box_offset]
x_center = raw_boxes[box_offset + 1]
h = raw_boxes[box_offset + 2]
w = raw_boxes[box_offset + 3]
if (options.reverse_output_order):
x_center = raw_boxes[box_offset]
y_center = raw_boxes[box_offset + 1]
w = raw_boxes[box_offset + 2]
h = raw_boxes[box_offset + 3]
x_center = x_center / options.x_scale * anchors[i].w + anchors[i].x_center
y_center = y_center / options.y_scale * anchors[i].h + anchors[i].y_center
if (options.apply_exponential_on_box_size):
h = np.exp(h / options.h_scale) * anchors[i].h
w = np.exp(w / options.w_scale) * anchors[i].w
else:
h = h / options.h_scale * anchors[i].h
w = w / options.w_scale * anchors[i].w
ymin = y_center - h / 2.0
xmin = x_center - w / 2.0
ymax = y_center + h / 2.0
xmax = x_center + w / 2.0
boxes[i * options.num_coords + 0] = ymin
boxes[i * options.num_coords + 1] = xmin
boxes[i * options.num_coords + 2] = ymax
boxes[i * options.num_coords + 3] = xmax
if (options.num_keypoints):
for k in range(options.num_keypoints):
offset = i * options.num_coords + options.keypoint_coord_offset + k * options.num_values_per_keypoint
keypoint_y = raw_boxes[offset]
keypoint_x = raw_boxes[offset + 1]
if (options.reverse_output_order):
keypoint_x = raw_boxes[offset]
keypoint_y = raw_boxes[offset + 1]
boxes[offset] = keypoint_x / options.x_scale * anchors[i].w + anchors[i].x_center
boxes[offset + 1] = keypoint_y / options.y_scale * anchors[i].h + anchors[i].y_center
return boxes
def ConvertToDetections(detection_boxes, detection_scores, detection_classes, options):
output_detections = []
for i in range(options.num_boxes):
if (detection_scores[i] < options.min_score_thresh):
# print('passed, score lower than threshold')
continue
print("box_idx:{:}".format(i))
box_offset = i * options.num_coords
detection = ConvertToDetection(
detection_boxes[box_offset + 0], detection_boxes[box_offset + 1],
detection_boxes[box_offset + 2], detection_boxes[box_offset + 3],
detection_scores[i], detection_classes[i], options.flip_vertically)
# Add keypoints. TODO:
# if (options.num_keypoints > 0):
# location_data = detection.mutable_location_data()
# kp_id = 0
# while(kp_id < options.num_keypoints * options.num_values_per_keypoint):
# keypoint = location_data->add_relative_keypoints()
# keypoint_index = box_offset + options.keypoint_coord_offset + kp_id
# keypoint->set_x(detection_boxes[keypoint_index + 0])
# keypoint->set_y(options.flip_vertically
# ? 1.f - detection_boxes[keypoint_index + 1]
# : detection_boxes[keypoint_index + 1])
# kp_id += options.num_values_per_keypoint
output_detections.append(detection);
return output_detections
def ConvertToDetection(box_ymin, box_xmin, box_ymax, box_xmax, score, class_id, flip_vertically):
# Detection detection;
# detection.add_score(score);
# detection.add_label_id(class_id);
# LocationData* location_data = detection.mutable_location_data();
# location_data->set_format(LocationData::RELATIVE_BOUNDING_BOX);
# LocationData::RelativeBoundingBox* relative_bbox = location_data->mutable_relative_bounding_box();
# relative_bbox->set_xmin(box_xmin);
# relative_bbox->set_ymin(flip_vertically ? 1.f - box_ymax : box_ymin);
# relative_bbox->set_width(box_xmax - box_xmin);
# relative_bbox->set_height(box_ymax - box_ymin);
detection = Detection(score, class_id, box_xmin, (1.0 - box_ymax if flip_vertically else box_ymin), (box_xmax - box_xmin), (box_ymax - box_ymin))
# print('score: {:}, class_id: {:}\n, xmin: {:}, ymin: {:}, width: {:}, height: {:}'.format(score, class_id, box_xmin, (1.0 - box_ymax if flip_vertically else box_ymin), (box_xmax - box_xmin), (box_ymax - box_ymin)))
return detection
def ProcessCPU(raw_boxes, raw_scores, anchors_, options):
# Postprocessing on CPU for model without postprocessing op. E.g. output
# raw score tensor and box tensor. Anchor decoding will be handled below.
boxes = DecodeBoxes(raw_boxes, anchors_, options)
detection_scores = np.zeros(options.num_boxes)
detection_classes = np.zeros(options.num_boxes)
# Filter classes by scores.
for i in range(options.num_boxes):
class_id = -1
max_score = np.finfo(float).min
# Find the top score for box i.
for score_idx in range(options.num_classes):
# if (ignore_classes_.find(score_idx) == ignore_classes_.end()) {
score = raw_scores[i * options.num_classes + score_idx]
if options.sigmoid_score:
if options.score_clipping_thresh>0:
score = -options.score_clipping_thresh if score<-options.score_clipping_thresh else score
score = options.score_clipping_thresh if score>options.score_clipping_thresh else score
score = 1.0 / (1.0 + np.exp(-score))
if (max_score < score):
max_score = score
class_id = score_idx
# }
detection_scores[i] = max_score
detection_classes[i] = class_id
print('--------------------------------')
print('boxes: ')
print(boxes.shape)
print(boxes)
print('--------------------------------')
print('detection_scores: ')
print(detection_scores.shape)
print(detection_scores)
print('--------------------------------')
print('detection_classes: ')
print(detection_classes.shape)
print(detection_classes)
output_detections = ConvertToDetections(boxes, detection_scores, detection_classes, options)
return output_detections
def orig_nms(detections, threshold):
"""nms
:boxes: [:,0:5]
:threshold: 0.5 like
:type: 'Min' or others
:returns: TODO
"""
if len(detections) <= 0:
return np.array([])
x1 = []
x2 = []
y1 = []
y2 = []
s = []
for detection in detections:
x1.append(detection.xmin)
x2.append(detection.xmin + detection.width)
y1.append(detection.ymin)
y2.append(detection.ymin + detection.height)
s.append(detection.score)
x1 = np.array(x1)
x2 = np.array(x2)
y1 = np.array(y1)
y2 = np.array(y2)
s = np.array(s)
area = np.multiply(x2-x1+1, y2-y1+1)
I = np.array(s.argsort()) # read s using I
pick = [];
while len(I) > 0:
xx1 = np.maximum(x1[I[-1]], x1[I[0:-1]])
yy1 = np.maximum(y1[I[-1]], y1[I[0:-1]])
xx2 = np.minimum(x2[I[-1]], x2[I[0:-1]])
yy2 = np.minimum(y2[I[-1]], y2[I[0:-1]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
o = inter / (area[I[-1]] + area[I[0:-1]] - inter)
pick.append(I[-1])
I = I[np.where( o <= threshold)[0]]
return list(np.array(detections)[pick])
def gen_anchors(options):
anchors = []
# Verify the options.
if (options.strides_size != options.num_layers):
print("strides_size and num_layers must be equal.")
return []
layer_id = 0
while (layer_id < options.strides_size):
anchor_height = []
anchor_width = []
aspect_ratios = []
scales = []
# For same strides, we merge the anchors in the same order.
last_same_stride_layer = layer_id
while (last_same_stride_layer < options.strides_size and options.strides[last_same_stride_layer] == options.strides[layer_id]):
scale = options.min_scale + (options.max_scale - options.min_scale) * 1.0 * last_same_stride_layer / (options.strides_size - 1.0)
if (last_same_stride_layer == 0 and options.reduce_boxes_in_lowest_layer):
# For first layer, it can be specified to use predefined anchors.
aspect_ratios.append(1.0)
aspect_ratios.append(2.0)
aspect_ratios.append(0.5)
scales.append(0.1)
scales.append(scale)
scales.append(scale)
else:
for aspect_ratio_id in range(options.aspect_ratios_size):
aspect_ratios.append(options.aspect_ratios[aspect_ratio_id])
scales.append(scale)
if (options.interpolated_scale_aspect_ratio > 0.0):
scale_next = 1.0 if last_same_stride_layer == options.strides_size - 1 else options.min_scale + (options.max_scale - options.min_scale) * 1.0 * (last_same_stride_layer+1) / (options.strides_size - 1.0)
scales.append(math.sqrt(scale * scale_next))
aspect_ratios.append(options.interpolated_scale_aspect_ratio)
last_same_stride_layer += 1
for i in range(len(aspect_ratios)):
ratio_sqrts = math.sqrt(aspect_ratios[i])
anchor_height.append(scales[i] / ratio_sqrts)
anchor_width.append(scales[i] * ratio_sqrts)
feature_map_height = 0
feature_map_width = 0
if (options.feature_map_height_size > 0):
feature_map_height = options.feature_map_height[layer_id]
feature_map_width = options.feature_map_width[layer_id]
else:
stride = options.strides[layer_id]
feature_map_height = math.ceil(1.0 * options.input_size_height / stride)
feature_map_width = math.ceil(1.0 * options.input_size_width / stride)
for y in range(feature_map_height):
for x in range(feature_map_width):
for anchor_id in range(len(anchor_height)):
# TODO: Support specifying anchor_offset_x, anchor_offset_y.
x_center = (x + options.anchor_offset_x) * 1.0 / feature_map_width
y_center = (y + options.anchor_offset_y) * 1.0 / feature_map_height
w = 0
h = 0
if (options.fixed_anchor_size):
w = 1.0
h = 1.0
else:
w = anchor_width[anchor_id]
h = anchor_height[anchor_id]
new_anchor = Anchor(x_center, y_center, h, w)
anchors.append(new_anchor)
layer_id = last_same_stride_layer
return anchors
def main():
# Options to generate anchors for SSD object detection models.
ssd_anchors_calculator_options = SsdAnchorsCalculatorOptions(input_size_width=128, input_size_height=128, min_scale=0.1484375, max_scale=0.75
, anchor_offset_x=0.5, anchor_offset_y=0.5, num_layers=4
, feature_map_width=[], feature_map_height=[]
, strides=[8, 16, 16, 16], aspect_ratios=[1.0]
, reduce_boxes_in_lowest_layer=False, interpolated_scale_aspect_ratio=1.0
, fixed_anchor_size=True)
print('------------------------------------------------')
print('SsdAnchorsCalculatorOptions: ')
print(ssd_anchors_calculator_options.to_string())
anchors = gen_anchors(ssd_anchors_calculator_options)
# print('------------------------------------------------')
# print('Anchors: ')
# print('number: {:}'.format(len(anchors)))
# for i, anchor in enumerate(anchors):
# print('Anchor {:}'.format(i))
# print(anchor.to_string())
options = TfLiteTensorsToDetectionsCalculatorOptions(num_classes=1, num_boxes=896, num_coords=16
, keypoint_coord_offset=4, ignore_classes=[], score_clipping_thresh=100.0, min_score_thresh=0.75
, num_keypoints=6, num_values_per_keypoint=2, box_coord_offset=0
, x_scale=128.0, y_scale=128.0, w_scale=128.0, h_scale=128.0, apply_exponential_on_box_size=False
, reverse_output_order=True, sigmoid_score=True, flip_vertically=False)
print('------------------------------------------------')
print('TfLiteTensorsToDetectionsCalculatorOptions: ')
print(options.to_string())
# blaze face model
# https://github.com/google/mediapipe/tree/master/mediapipe/models/face_detection_front.tflite
model_path = './face_detection_front.tflite'
# Load TFLite model and allocate tensors.
interpreter = tf.contrib.lite.Interpreter(model_path=model_path)
interpreter.allocate_tensors()
# Get input and output tensors.
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print('--------------------------------')
print("input_details: ")
print(input_details)
print("output_details: ")
print(output_details)
# capture = cv2.VideoCapture('./videoplayback_1.mp4')
capture = cv2.VideoCapture(0)
frame_cnt = 0
accum_time = 0
curr_fps = 0
fps = "FPS: ??"
prev_time = time.time()
while (True):
ret, img = capture.read()
# img = cv2.imread('./test_image.jpg')
img_height = img.shape[0]
img_width = img.shape[1]
frame_cnt += 1
print('-------- frame_cnt: '+str(frame_cnt)+' --------')
if ret == True:
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
preprocess_start_time = time.time()
# input shape
input_width = input_details[0]["shape"][1]
input_height = input_details[0]["shape"][2]
# resize
input_data = cv2.resize(img_rgb, (input_width, input_height)).astype(np.float32)
# preprocess
# input_data = (input_data)
input_data = ((input_data-127.5)/127.5)
# input_data = ((input_data)/255)
input_data = np.expand_dims(input_data, axis=0)
preprocess_end_time = time.time()
inference_start_time = time.time()
# set input data
interpreter.set_tensor(input_details[0]["index"], input_data)
interpreter.invoke()
regressors = interpreter.get_tensor(output_details[0]["index"])
classificators = interpreter.get_tensor(output_details[1]["index"])
inference_end_time = time.time()
# print('--------------------------------')
# print('regressors: ')
# print(regressors.shape)
# print(regressors)
# print('--------------------------------')
# print('classificators: ')
# print(classificators.shape)
# print(classificators)
postprocess_start_time = time.time()
raw_boxes = np.reshape(regressors, int(regressors.shape[0]*regressors.shape[1]*regressors.shape[2]))
raw_scores = np.reshape(classificators, int(classificators.shape[0]*classificators.shape[1]*classificators.shape[2]))
detections = ProcessCPU(raw_boxes, raw_scores, anchors, options)
detections = orig_nms(detections, 0.3)
print('--------------------------------')
print('detections: ')
print('number: {:}'.format(len(detections)))
for detection in detections:
print(detection.to_string())
x1 = int(img_width * detection.xmin)
x2 = int(img_width * (detection.xmin + detection.width))
y1 = int(img_height * detection.ymin)
y2 = int(img_height * (detection.ymin + detection.height))
print("x1: {:}, y1: {:}\nx2: {:}, y2: {:}".format(x1, y1, x2, y2))
cv2.rectangle(img, (x1, y1), (x2, y2), (255,0,0), 2)
cv2.putText(img, '{:.2f}'.format(detection.score), (x1, y1 - 6)
, cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)
postprocess_end_time = time.time()
print('preprocess cost: {:.2f} ms'.format((preprocess_end_time-preprocess_start_time)*1000))
print('inference cost: {:.2f} ms'.format((inference_end_time-inference_start_time)*1000))
print('postprocess cost: {:.2f} ms'.format((postprocess_end_time-postprocess_start_time)*1000))
curr_time = time.time()
exec_time = curr_time - prev_time
prev_time = curr_time
accum_time = accum_time + exec_time
curr_fps = curr_fps + 1
if accum_time > 1:
accum_time = accum_time - 1
fps = "FPS: " + str(curr_fps)
curr_fps = 0
print(fps)
cv2.putText(img, text=fps , org=(10, 25)
, fontFace=cv2.FONT_HERSHEY_SIMPLEX, fontScale=0.60, color=(255, 0, 0), thickness=2)
cv2.imshow('img', img)
c = cv2.waitKey(1) & 0xff
if c==27:
break
# if frame_cnt>100:
# exit(0)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment