Created
September 18, 2020 13:03
-
-
Save pythonlessons/a68c9623bf953d5b5037fc026926657a to your computer and use it in GitHub Desktop.
multiprocessing_test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Process, Queue, Pipe | |
import cv2 | |
import time | |
import random | |
import colorsys | |
import numpy as np | |
import tensorflow as tf | |
from yolov3.configs import * | |
from yolov3.yolov4 import * | |
from tensorflow.python.saved_model import tag_constants | |
def load_yolo_weights(model, weights_file): | |
tf.keras.backend.clear_session() # used to reset layer names | |
# load Darknet original weights to TensorFlow model | |
if YOLO_TYPE == "yolov3": | |
range1 = 75 if not TRAIN_YOLO_TINY else 13 | |
range2 = [58, 66, 74] if not TRAIN_YOLO_TINY else [9, 12] | |
if YOLO_TYPE == "yolov4": | |
range1 = 110 if not TRAIN_YOLO_TINY else 21 | |
range2 = [93, 101, 109] if not TRAIN_YOLO_TINY else [17, 20] | |
with open(weights_file, 'rb') as wf: | |
major, minor, revision, seen, _ = np.fromfile(wf, dtype=np.int32, count=5) | |
j = 0 | |
for i in range(range1): | |
if i > 0: | |
conv_layer_name = 'conv2d_%d' %i | |
else: | |
conv_layer_name = 'conv2d' | |
if j > 0: | |
bn_layer_name = 'batch_normalization_%d' %j | |
else: | |
bn_layer_name = 'batch_normalization' | |
conv_layer = model.get_layer(conv_layer_name) | |
filters = conv_layer.filters | |
k_size = conv_layer.kernel_size[0] | |
in_dim = conv_layer.input_shape[-1] | |
if i not in range2: | |
# darknet weights: [beta, gamma, mean, variance] | |
bn_weights = np.fromfile(wf, dtype=np.float32, count=4 * filters) | |
# tf weights: [gamma, beta, mean, variance] | |
bn_weights = bn_weights.reshape((4, filters))[[1, 0, 2, 3]] | |
bn_layer = model.get_layer(bn_layer_name) | |
j += 1 | |
else: | |
conv_bias = np.fromfile(wf, dtype=np.float32, count=filters) | |
# darknet shape (out_dim, in_dim, height, width) | |
conv_shape = (filters, in_dim, k_size, k_size) | |
conv_weights = np.fromfile(wf, dtype=np.float32, count=np.product(conv_shape)) | |
# tf shape (height, width, in_dim, out_dim) | |
conv_weights = conv_weights.reshape(conv_shape).transpose([2, 3, 1, 0]) | |
if i not in range2: | |
conv_layer.set_weights([conv_weights]) | |
bn_layer.set_weights(bn_weights) | |
else: | |
conv_layer.set_weights([conv_weights, conv_bias]) | |
assert len(wf.read()) == 0, 'failed to read all data' | |
def Load_Yolo_model(): | |
if YOLO_FRAMEWORK == "tf": # TensorFlow detection | |
if YOLO_TYPE == "yolov4": | |
Darknet_weights = YOLO_V4_TINY_WEIGHTS if TRAIN_YOLO_TINY else YOLO_V4_WEIGHTS | |
if YOLO_TYPE == "yolov3": | |
Darknet_weights = YOLO_V3_TINY_WEIGHTS if TRAIN_YOLO_TINY else YOLO_V3_WEIGHTS | |
if YOLO_CUSTOM_WEIGHTS == False: | |
yolo = Create_Yolo(input_size=YOLO_INPUT_SIZE, CLASSES=YOLO_COCO_CLASSES) | |
load_yolo_weights(yolo, Darknet_weights) # use Darknet weights | |
else: | |
yolo = Create_Yolo(input_size=YOLO_INPUT_SIZE, CLASSES=TRAIN_CLASSES) | |
yolo.load_weights(YOLO_CUSTOM_WEIGHTS) # use custom weights | |
elif YOLO_FRAMEWORK == "trt": # TensorRT detection | |
saved_model_loaded = tf.saved_model.load(YOLO_CUSTOM_WEIGHTS, tags=[tag_constants.SERVING]) | |
signature_keys = list(saved_model_loaded.signatures.keys()) | |
yolo = saved_model_loaded.signatures['serving_default'] | |
return yolo | |
def image_preprocess(image, target_size, gt_boxes=None): | |
ih, iw = target_size | |
h, w, _ = image.shape | |
scale = min(iw/w, ih/h) | |
nw, nh = int(scale * w), int(scale * h) | |
image_resized = cv2.resize(image, (nw, nh)) | |
image_paded = np.full(shape=[ih, iw, 3], fill_value=128.0) | |
dw, dh = (iw - nw) // 2, (ih-nh) // 2 | |
image_paded[dh:nh+dh, dw:nw+dw, :] = image_resized | |
image_paded = image_paded / 255. | |
if gt_boxes is None: | |
return image_paded | |
else: | |
gt_boxes[:, [0, 2]] = gt_boxes[:, [0, 2]] * scale + dw | |
gt_boxes[:, [1, 3]] = gt_boxes[:, [1, 3]] * scale + dh | |
return image_paded, gt_boxes | |
def draw_bbox(image, bboxes, CLASSES=YOLO_COCO_CLASSES, show_label=True, show_confidence = True, Text_colors=(255,255,0), rectangle_colors='', tracking=False): | |
NUM_CLASS = read_class_names(CLASSES) | |
num_classes = len(NUM_CLASS) | |
image_h, image_w, _ = image.shape | |
hsv_tuples = [(1.0 * x / num_classes, 1., 1.) for x in range(num_classes)] | |
#print("hsv_tuples", hsv_tuples) | |
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples)) | |
colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors)) | |
random.seed(0) | |
random.shuffle(colors) | |
random.seed(None) | |
for i, bbox in enumerate(bboxes): | |
coor = np.array(bbox[:4], dtype=np.int32) | |
score = bbox[4] | |
class_ind = int(bbox[5]) | |
bbox_color = rectangle_colors if rectangle_colors != '' else colors[class_ind] | |
bbox_thick = int(0.6 * (image_h + image_w) / 1000) | |
if bbox_thick < 1: bbox_thick = 1 | |
fontScale = 0.75 * bbox_thick | |
(x1, y1), (x2, y2) = (coor[0], coor[1]), (coor[2], coor[3]) | |
# put object rectangle | |
cv2.rectangle(image, (x1, y1), (x2, y2), bbox_color, bbox_thick*2) | |
if show_label: | |
# get text label | |
score_str = " {:.2f}".format(score) if show_confidence else "" | |
if tracking: score_str = " "+str(score) | |
label = "{}".format(NUM_CLASS[class_ind]) + score_str | |
# get text size | |
(text_width, text_height), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_COMPLEX_SMALL, | |
fontScale, thickness=bbox_thick) | |
# put filled text rectangle | |
cv2.rectangle(image, (x1, y1), (x1 + text_width, y1 - text_height - baseline), bbox_color, thickness=cv2.FILLED) | |
# put text above rectangle | |
cv2.putText(image, label, (x1, y1-4), cv2.FONT_HERSHEY_COMPLEX_SMALL, | |
fontScale, Text_colors, bbox_thick, lineType=cv2.LINE_AA) | |
return image | |
def bboxes_iou(boxes1, boxes2): | |
boxes1 = np.array(boxes1) | |
boxes2 = np.array(boxes2) | |
boxes1_area = (boxes1[..., 2] - boxes1[..., 0]) * (boxes1[..., 3] - boxes1[..., 1]) | |
boxes2_area = (boxes2[..., 2] - boxes2[..., 0]) * (boxes2[..., 3] - boxes2[..., 1]) | |
left_up = np.maximum(boxes1[..., :2], boxes2[..., :2]) | |
right_down = np.minimum(boxes1[..., 2:], boxes2[..., 2:]) | |
inter_section = np.maximum(right_down - left_up, 0.0) | |
inter_area = inter_section[..., 0] * inter_section[..., 1] | |
union_area = boxes1_area + boxes2_area - inter_area | |
ious = np.maximum(1.0 * inter_area / union_area, np.finfo(np.float32).eps) | |
return ious | |
def nms(bboxes, iou_threshold, sigma=0.3, method='nms'): | |
""" | |
:param bboxes: (xmin, ymin, xmax, ymax, score, class) | |
Note: soft-nms, https://arxiv.org/pdf/1704.04503.pdf | |
https://github.com/bharatsingh430/soft-nms | |
""" | |
classes_in_img = list(set(bboxes[:, 5])) | |
best_bboxes = [] | |
for cls in classes_in_img: | |
cls_mask = (bboxes[:, 5] == cls) | |
cls_bboxes = bboxes[cls_mask] | |
# Process 1: Determine whether the number of bounding boxes is greater than 0 | |
while len(cls_bboxes) > 0: | |
# Process 2: Select the bounding box with the highest score according to socre order A | |
max_ind = np.argmax(cls_bboxes[:, 4]) | |
best_bbox = cls_bboxes[max_ind] | |
best_bboxes.append(best_bbox) | |
cls_bboxes = np.concatenate([cls_bboxes[: max_ind], cls_bboxes[max_ind + 1:]]) | |
# Process 3: Calculate this bounding box A and | |
# Remain all iou of the bounding box and remove those bounding boxes whose iou value is higher than the threshold | |
iou = bboxes_iou(best_bbox[np.newaxis, :4], cls_bboxes[:, :4]) | |
weight = np.ones((len(iou),), dtype=np.float32) | |
assert method in ['nms', 'soft-nms'] | |
if method == 'nms': | |
iou_mask = iou > iou_threshold | |
weight[iou_mask] = 0.0 | |
if method == 'soft-nms': | |
weight = np.exp(-(1.0 * iou ** 2 / sigma)) | |
cls_bboxes[:, 4] = cls_bboxes[:, 4] * weight | |
score_mask = cls_bboxes[:, 4] > 0. | |
cls_bboxes = cls_bboxes[score_mask] | |
return best_bboxes | |
def postprocess_boxes(pred_bbox, original_image, input_size, score_threshold): | |
valid_scale=[0, np.inf] | |
pred_bbox = np.array(pred_bbox) | |
pred_xywh = pred_bbox[:, 0:4] | |
pred_conf = pred_bbox[:, 4] | |
pred_prob = pred_bbox[:, 5:] | |
# 1. (x, y, w, h) --> (xmin, ymin, xmax, ymax) | |
pred_coor = np.concatenate([pred_xywh[:, :2] - pred_xywh[:, 2:] * 0.5, | |
pred_xywh[:, :2] + pred_xywh[:, 2:] * 0.5], axis=-1) | |
# 2. (xmin, ymin, xmax, ymax) -> (xmin_org, ymin_org, xmax_org, ymax_org) | |
org_h, org_w = original_image.shape[:2] | |
resize_ratio = min(input_size / org_w, input_size / org_h) | |
dw = (input_size - resize_ratio * org_w) / 2 | |
dh = (input_size - resize_ratio * org_h) / 2 | |
pred_coor[:, 0::2] = 1.0 * (pred_coor[:, 0::2] - dw) / resize_ratio | |
pred_coor[:, 1::2] = 1.0 * (pred_coor[:, 1::2] - dh) / resize_ratio | |
# 3. clip some boxes those are out of range | |
pred_coor = np.concatenate([np.maximum(pred_coor[:, :2], [0, 0]), | |
np.minimum(pred_coor[:, 2:], [org_w - 1, org_h - 1])], axis=-1) | |
invalid_mask = np.logical_or((pred_coor[:, 0] > pred_coor[:, 2]), (pred_coor[:, 1] > pred_coor[:, 3])) | |
pred_coor[invalid_mask] = 0 | |
# 4. discard some invalid boxes | |
bboxes_scale = np.sqrt(np.multiply.reduce(pred_coor[:, 2:4] - pred_coor[:, 0:2], axis=-1)) | |
scale_mask = np.logical_and((valid_scale[0] < bboxes_scale), (bboxes_scale < valid_scale[1])) | |
# 5. discard boxes with low scores | |
classes = np.argmax(pred_prob, axis=-1) | |
scores = pred_conf * pred_prob[np.arange(len(pred_coor)), classes] | |
score_mask = scores > score_threshold | |
mask = np.logical_and(scale_mask, score_mask) | |
coors, scores, classes = pred_coor[mask], scores[mask], classes[mask] | |
return np.concatenate([coors, scores[:, np.newaxis], classes[:, np.newaxis]], axis=-1) | |
def detect_image(Yolo, image_path, output_path, input_size=416, show=False, CLASSES=YOLO_COCO_CLASSES, score_threshold=0.3, iou_threshold=0.45, rectangle_colors=''): | |
original_image = cv2.imread(image_path) | |
original_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB) | |
original_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB) | |
image_data = image_preprocess(np.copy(original_image), [input_size, input_size]) | |
image_data = image_data[np.newaxis, ...].astype(np.float32) | |
if YOLO_FRAMEWORK == "tf": | |
pred_bbox = Yolo.predict(image_data) | |
elif YOLO_FRAMEWORK == "trt": | |
batched_input = tf.constant(image_data) | |
result = Yolo(batched_input) | |
pred_bbox = [] | |
for key, value in result.items(): | |
value = value.numpy() | |
pred_bbox.append(value) | |
pred_bbox = [tf.reshape(x, (-1, tf.shape(x)[-1])) for x in pred_bbox] | |
pred_bbox = tf.concat(pred_bbox, axis=0) | |
bboxes = postprocess_boxes(pred_bbox, original_image, input_size, score_threshold) | |
bboxes = nms(bboxes, iou_threshold, method='nms') | |
image = draw_bbox(original_image, bboxes, CLASSES=CLASSES, rectangle_colors=rectangle_colors) | |
if output_path != '': cv2.imwrite(output_path, image) | |
if show: | |
# Show the image | |
cv2.imshow("predicted image", image) | |
# Load and hold the image | |
cv2.waitKey(0) | |
# To close the window after the required kill value was provided | |
cv2.destroyAllWindows() | |
return image | |
def Predict_bbox_mp(Frames_data, Predicted_data, Processing_times): | |
gpus = tf.config.experimental.list_physical_devices('GPU') | |
if len(gpus) > 0: | |
try: tf.config.experimental.set_memory_growth(gpus[0], True) | |
except RuntimeError: print("RuntimeError in tf.config.experimental.list_physical_devices('GPU')") | |
Yolo = Load_Yolo_model() | |
times = [] | |
SUM = 0 | |
is_first_detection = True | |
while True: | |
if Frames_data.qsize()>0: | |
image_data = Frames_data.get() | |
Processing_times.put(time.time()) | |
t1 = time.time() | |
if YOLO_FRAMEWORK == "tf": | |
pred_bbox = Yolo.predict(image_data) | |
elif YOLO_FRAMEWORK == "trt": | |
#print("Do prediction") | |
batched_input = tf.constant(image_data) | |
result = Yolo(batched_input) | |
#print("OK") | |
pred_bbox = [] | |
for key, value in result.items(): | |
value = value.numpy() | |
pred_bbox.append(value) | |
t2 = time.time() | |
SUM += (t2-t1) | |
if is_first_detection == True: | |
SUM = 0 | |
is_first_detection = False | |
print("prediction started") | |
pred_bbox = [tf.reshape(x, (-1, tf.shape(x)[-1])) for x in pred_bbox] | |
pred_bbox = tf.concat(pred_bbox, axis=0) | |
Predicted_data.put(pred_bbox) | |
if Frames_data.qsize()==0 and SUM != 0: | |
#time.sleep(0.1) | |
print(SUM) | |
def postprocess_mp(Predicted_data, original_frames, Processed_frames, Processing_times, input_size, CLASSES, score_threshold, iou_threshold, rectangle_colors, realtime): | |
times = [] | |
while True: | |
if Predicted_data.qsize()>0: | |
pred_bbox = Predicted_data.get() | |
if realtime: | |
while original_frames.qsize() > 1: | |
original_image = original_frames.get() | |
else: | |
original_image = original_frames.get() | |
bboxes = postprocess_boxes(pred_bbox, original_image, input_size, score_threshold) | |
bboxes = nms(bboxes, iou_threshold, method='nms') | |
image = draw_bbox(original_image, bboxes, CLASSES=CLASSES, rectangle_colors=rectangle_colors) | |
times.append(time.time()-Processing_times.get()) | |
times = times[-20:] | |
ms = sum(times)/len(times)*1000 | |
fps = 1000 / ms | |
image = cv2.putText(image, "Time: {:.1f}FPS".format(fps), (0, 30), cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (0, 0, 255), 2) | |
#print("Time: {:.2f}ms, Final FPS: {:.1f}".format(ms, fps)) | |
Processed_frames.put(image) | |
def Show_Image_mp(Processed_frames, show, Final_frames): | |
while True: | |
if Processed_frames.qsize()>0: | |
image = Processed_frames.get() | |
Final_frames.put(image) | |
if show: | |
cv2.imshow('output', image) | |
if cv2.waitKey(25) & 0xFF == ord("q"): | |
cv2.destroyAllWindows() | |
break | |
def detect_video_realtime_mp(video_path, output_path, input_size=416, show=False, CLASSES=YOLO_COCO_CLASSES, score_threshold=0.3, iou_threshold=0.45, rectangle_colors='', realtime=False): | |
if realtime: | |
vid = cv2.VideoCapture(0) | |
else: | |
vid = cv2.VideoCapture(video_path) | |
# by default VideoCapture returns float instead of int | |
width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fps = int(vid.get(cv2.CAP_PROP_FPS)) | |
codec = cv2.VideoWriter_fourcc(*'XVID') | |
out = cv2.VideoWriter(output_path, codec, fps, (width, height)) # output_path must be .mp4 | |
no_of_frames = int(vid.get(cv2.CAP_PROP_FRAME_COUNT)) | |
original_frames = Queue() | |
Frames_data = Queue() | |
Predicted_data = Queue() | |
Processed_frames = Queue() | |
Processing_times = Queue() | |
Final_frames = Queue() | |
p1 = Process(target=Predict_bbox_mp, args=(Frames_data, Predicted_data, Processing_times)) | |
p2 = Process(target=postprocess_mp, args=(Predicted_data, original_frames, Processed_frames, Processing_times, input_size, CLASSES, score_threshold, iou_threshold, rectangle_colors, realtime)) | |
p3 = Process(target=Show_Image_mp, args=(Processed_frames, show, Final_frames)) | |
p1.start() | |
p2.start() | |
p3.start() | |
started = False | |
while True: | |
ret, img = vid.read() | |
if not ret: | |
break | |
original_image = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
original_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB) | |
original_frames.put(original_image) | |
image_data = image_preprocess(np.copy(original_image), [input_size, input_size]) | |
image_data = image_data[np.newaxis, ...].astype(np.float32) | |
Frames_data.put(image_data) | |
while started == False and Frames_data.qsize()>0: | |
if Processed_frames.qsize() == 0: | |
time.sleep(0.1) | |
#print("wait") | |
continue | |
else: | |
started = True | |
start_time = time.time() | |
#print("break") | |
break | |
while True: | |
if original_frames.qsize() == 0 and Frames_data.qsize() == 0 and Predicted_data.qsize() == 0 and Processed_frames.qsize() == 0 and Processing_times.qsize() == 0 and Final_frames.qsize() == 0: | |
p1.terminate() | |
p2.terminate() | |
p3.terminate() | |
break | |
elif Final_frames.qsize()>0: | |
image = Final_frames.get() | |
if output_path != '': out.write(image) | |
end_time = time.time() | |
print("total_duration", end_time-start_time) | |
cv2.destroyAllWindows() | |
# detect from video | |
def detect_video(Yolo, video_path, output_path, input_size=416, show=False, CLASSES=YOLO_COCO_CLASSES, score_threshold=0.3, iou_threshold=0.45, rectangle_colors=''): | |
times, times_2 = [], [] | |
SUM = 0 | |
is_first_detection = True | |
vid = cv2.VideoCapture(video_path) | |
# by default VideoCapture returns float instead of int | |
width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fps = int(vid.get(cv2.CAP_PROP_FPS)) | |
codec = cv2.VideoWriter_fourcc(*'XVID') | |
out = cv2.VideoWriter(output_path, codec, fps, (width, height)) # output_path must be .mp4 | |
while True: | |
_, img = vid.read() | |
try: | |
original_image = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
original_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB) | |
except: | |
break | |
image_data = image_preprocess(np.copy(original_image), [input_size, input_size]) | |
image_data = image_data[np.newaxis, ...].astype(np.float32) | |
t1 = time.time() | |
if YOLO_FRAMEWORK == "tf": | |
pred_bbox = Yolo.predict(image_data) | |
elif YOLO_FRAMEWORK == "trt": | |
batched_input = tf.constant(image_data) | |
result = Yolo(batched_input) | |
pred_bbox = [] | |
for key, value in result.items(): | |
value = value.numpy() | |
pred_bbox.append(value) | |
t2 = time.time() | |
pred_bbox = [tf.reshape(x, (-1, tf.shape(x)[-1])) for x in pred_bbox] | |
pred_bbox = tf.concat(pred_bbox, axis=0) | |
bboxes = postprocess_boxes(pred_bbox, original_image, input_size, score_threshold) | |
bboxes = nms(bboxes, iou_threshold, method='nms') | |
image = draw_bbox(original_image, bboxes, CLASSES=CLASSES, rectangle_colors=rectangle_colors) | |
t3 = time.time() | |
times.append(t2-t1) | |
SUM += (t2-t1) | |
if is_first_detection == True: | |
SUM = 0 | |
is_first_detection = False | |
start_time = time.time() | |
times_2.append(t3-t1) | |
times = times[-20:] | |
times_2 = times_2[-20:] | |
ms = sum(times)/len(times)*1000 | |
fps = 1000 / ms | |
fps2 = 1000 / (sum(times_2)/len(times_2)*1000) | |
image = cv2.putText(image, "Time: {:.1f}FPS".format(fps), (0, 30), cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (0, 0, 255), 2) | |
#print("Time: {:.2f}ms, Detection FPS: {:.1f}, total FPS: {:.1f}".format(ms, fps, fps2)) | |
if output_path != '': out.write(image) | |
if show: | |
cv2.imshow('output', image) | |
if cv2.waitKey(25) & 0xFF == ord("q"): | |
cv2.destroyAllWindows() | |
break | |
end_time = time.time() | |
print(SUM) | |
print("total_duration", end_time-start_time) | |
cv2.destroyAllWindows() | |
# detect from webcam | |
def detect_realtime(Yolo, output_path, input_size=416, show=False, CLASSES=YOLO_COCO_CLASSES, score_threshold=0.3, iou_threshold=0.45, rectangle_colors=''): | |
times = [] | |
vid = cv2.VideoCapture(0) | |
# by default VideoCapture returns float instead of int | |
width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fps = int(vid.get(cv2.CAP_PROP_FPS)) | |
codec = cv2.VideoWriter_fourcc(*'XVID') | |
out = cv2.VideoWriter(output_path, codec, fps, (width, height)) # output_path must be .mp4 | |
while True: | |
_, frame = vid.read() | |
try: | |
original_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
original_frame = cv2.cvtColor(original_frame, cv2.COLOR_BGR2RGB) | |
except: | |
break | |
image_data = image_preprocess(np.copy(original_frame), [input_size, input_size]) | |
image_data = image_data[np.newaxis, ...].astype(np.float32) | |
t1 = time.time() | |
if YOLO_FRAMEWORK == "tf": | |
pred_bbox = Yolo.predict(image_data) | |
elif YOLO_FRAMEWORK == "trt": | |
batched_input = tf.constant(image_data) | |
result = Yolo(batched_input) | |
pred_bbox = [] | |
for key, value in result.items(): | |
value = value.numpy() | |
pred_bbox.append(value) | |
t2 = time.time() | |
pred_bbox = [tf.reshape(x, (-1, tf.shape(x)[-1])) for x in pred_bbox] | |
pred_bbox = tf.concat(pred_bbox, axis=0) | |
bboxes = postprocess_boxes(pred_bbox, original_frame, input_size, score_threshold) | |
bboxes = nms(bboxes, iou_threshold, method='nms') | |
times.append(t2-t1) | |
times = times[-20:] | |
ms = sum(times)/len(times)*1000 | |
fps = 1000 / ms | |
print("Time: {:.2f}ms, {:.1f} FPS".format(ms, fps)) | |
frame = draw_bbox(original_frame, bboxes, CLASSES=CLASSES, rectangle_colors=rectangle_colors) | |
image = cv2.putText(frame, "Time: {:.1f}FPS".format(fps), (0, 30), | |
cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (0, 0, 255), 2) | |
if output_path != '': out.write(frame) | |
if show: | |
cv2.imshow('output', frame) | |
if cv2.waitKey(25) & 0xFF == ord("q"): | |
cv2.destroyAllWindows() | |
break | |
cv2.destroyAllWindows() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment