Skip to content

Instantly share code, notes, and snippets.

@chad-green
Created September 4, 2018 05:54
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save chad-green/713d88e9515aa8a9a8cf46c2a7ac8a16 to your computer and use it in GitHub Desktop.
Save chad-green/713d88e9515aa8a9a8cf46c2a7ac8a16 to your computer and use it in GitHub Desktop.
Update for NCSDK 2.0 based on code from Adrian Rosebrock's tutorial on pyimagesearch.com (see link in comments)
# Updated for NCSDK 2.0. Original code by Adrian Rosebrock
# https://www.pyimagesearch.com/2018/02/19/real-time-object-detection-on-the-raspberry-pi-with-the-movidius-ncs/
# USAGE
# python ncs2_realtime_objectdetection.py --graph graphs/mobilenetgraph --display 1
# python ncs2_realtime_objectdetection.py --graph graphs/mobilenetgraph --confidence 0.5 --display 1
# import the necessary packages
from mvnc import mvncapi as mvnc
from imutils.video import VideoStream
from imutils.video import FPS
import argparse
import numpy as np
import time
import cv2
# initialize the list of class labels our network was trained to
# detect, then generate a set of bounding box colors for each class
CLASSES = ("background", "aeroplane", "bicycle", "bird",
"boat", "bottle", "bus", "car", "cat", "chair", "cow",
"diningtable", "dog", "horse", "motorbike", "person",
"pottedplant", "sheep", "sofa", "train", "tvmonitor")
COLORS = np.random.uniform(0, 255, size=(len(CLASSES), 3))
# frame dimensions should be sqaure
PREPROCESS_DIMS = (300, 300)
DISPLAY_DIMS = (900, 900)
# calculate the multiplier needed to scale the bounding boxes
DISP_MULTIPLIER = DISPLAY_DIMS[0] // PREPROCESS_DIMS[0]
def preprocess_image(input_image):
# preprocess the image
preprocessed = cv2.resize(input_image, PREPROCESS_DIMS)
preprocessed = preprocessed - 127.5
preprocessed = preprocessed * 0.007843
# [INFO] changed this to FP32 (np.float32) instead of FP16 because that is the NCS default
# might want to change back and set the graph initialation to FP16 instead
preprocessed = preprocessed.astype(np.float32)
# return the image to the calling function
return preprocessed
def predict(image, graph):
# preprocess the image
image = preprocess_image(image)
# send the image to the NCS and run a forward pass to grab the
# network predictions
# [INFO v1.0] graph.LoadTensor(image, None)
graph.queue_inference_with_fifo_elem(input_fifo, output_fifo, image, 'user object')
#[INFO v1.0] (output, _) = graph.GetResult()
(output, _) = output_fifo.read_elem()
# grab the number of valid object predictions from the output,
# then initialize the list of predictions
# [INFO] v2.0 needed this to be explicitly set to type int()
num_valid_boxes = int(output[0])
predictions = []
# loop over results
for box_index in range(num_valid_boxes):
# calculate the base index into our array so we can extract
# bounding box information
base_index = 7 + box_index * 7
# boxes with non-finite (inf, nan, etc) numbers must be ignored
if (not np.isfinite(output[base_index]) or
not np.isfinite(output[base_index + 1]) or
not np.isfinite(output[base_index + 2]) or
not np.isfinite(output[base_index + 3]) or
not np.isfinite(output[base_index + 4]) or
not np.isfinite(output[base_index + 5]) or
not np.isfinite(output[base_index + 6])):
continue
# extract the image width and height and clip the boxes to the
# image size in case network returns boxes outside of the image
# boundaries
(h, w) = image.shape[:2]
x1 = max(0, int(output[base_index + 3] * w))
y1 = max(0, int(output[base_index + 4] * h))
x2 = min(w, int(output[base_index + 5] * w))
y2 = min(h, int(output[base_index + 6] * h))
# grab the prediction class label, confidence (i.e., probability),
# and bounding box (x, y)-coordinates
pred_class = int(output[base_index + 1])
pred_conf = output[base_index + 2]
pred_boxpts = ((x1, y1), (x2, y2))
# create prediciton tuple and append the prediction to the
# predictions list
prediction = (pred_class, pred_conf, pred_boxpts)
predictions.append(prediction)
# return the list of predictions to the calling function
return predictions
# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-g", "--graph", required=True,
help="path to input graph file")
ap.add_argument("-c", "--confidence", default=.5,
help="confidence threshold")
ap.add_argument("-d", "--display", type=int, default=0,
help="switch to display image on screen")
args = vars(ap.parse_args())
# grab a list of all NCS devices plugged in to USB
print("[INFO] finding NCS devices...")
#[INFO v1.0] devices = mvnc.EnumerateDevices()
devices = mvnc.enumerate_devices()
# if no devices found, exit the script
if len(devices) == 0:
print("[INFO] No devices found. Please plug in a NCS")
quit()
# use the first device since this is a simple test script
# (you'll want to modify this is using multiple NCS devices)
print("[INFO] found {} devices. device0 will be used. "
"opening device0...".format(len(devices)))
device = mvnc.Device(devices[0])
#[INFO v1.0] device.OpenDevice()
device.open()
# open the CNN graph file
print("[INFO] loading the graph file into RPi memory...")
with open(args["graph"], mode="rb") as f:
graph_in_memory = f.read()
# load the graph into the NCS
print("[INFO] allocating the graph on the NCS...")
#[INFO v1.0] graph = device.AllocateGraph(graph_in_memory)
graph = mvnc.Graph('graph1')
# [INFO v2.0 requires this] Allocate the graph to the device and create
# input and output Fifos with default arguments
input_fifo, output_fifo = graph.allocate_with_fifos(device, graph_in_memory)
# open a pointer to the video stream thread and allow the buffer to
# start to fill, then start the FPS counter
print("[INFO] starting the video stream and FPS counter...")
vs = VideoStream(usePiCamera=False).start()
time.sleep(1)
fps = FPS().start()
# loop over frames from the video file stream
while True:
try:
# grab the frame from the threaded video stream
# make a copy of the frame and resize it for display/video purposes
frame = vs.read()
image_for_result = frame.copy()
image_for_result = cv2.resize(image_for_result, DISPLAY_DIMS)
# use the NCS to acquire predictions
predictions = predict(frame, graph)
# loop over our predictions
for (i, pred) in enumerate(predictions):
# extract prediction data for readability
(pred_class, pred_conf, pred_boxpts) = pred
# filter out weak detections by ensuring the `confidence`
# is greater than the minimum confidence
if pred_conf > args["confidence"]:
# print prediction to terminal
print("[INFO] Prediction #{}: class={}, confidence={}, "
"boxpoints={}".format(i, CLASSES[pred_class], pred_conf,
pred_boxpts))
# check if we should show the prediction data
# on the frame
if args["display"] > 0:
# build a label consisting of the predicted class and
# associated probability
label = "{}: {:.2f}%".format(CLASSES[pred_class],
pred_conf * 100)
# extract information from the prediction boxpoints
(ptA, ptB) = (pred_boxpts[0], pred_boxpts[1])
ptA = (ptA[0] * DISP_MULTIPLIER, ptA[1] * DISP_MULTIPLIER)
ptB = (ptB[0] * DISP_MULTIPLIER, ptB[1] * DISP_MULTIPLIER)
(startX, startY) = (ptA[0], ptA[1])
y = startY - 15 if startY - 15 > 15 else startY + 15
# display the rectangle and label text
cv2.rectangle(image_for_result, ptA, ptB,
COLORS[pred_class], 2)
cv2.putText(image_for_result, label, (startX, y),
cv2.FONT_HERSHEY_SIMPLEX, 1, COLORS[pred_class], 3)
# check if we should display the frame on the screen
# with prediction data (you can achieve faster FPS if you
# do not output to the screen)
if args["display"] > 0:
# display the frame to the screen
cv2.imshow("Output", image_for_result)
key = cv2.waitKey(1) & 0xFF
# if the `q` key was pressed, break from the loop
if key == ord("q"):
break
# update the FPS counter
fps.update()
# if "ctrl+c" is pressed in the terminal, break from the loop
except KeyboardInterrupt:
break
# if there's a problem reading a frame, break gracefully
except AttributeError:
break
# stop the FPS counter timer
fps.stop()
# destroy all windows if we are displaying them
if args["display"] > 0:
cv2.destroyAllWindows()
# stop the video stream
vs.stop()
# clean up the graph and device
#[INFO v1.0] graph.DeallocateGraph()
#[INFO v1.0] device.CloseDevice()
input_fifo.destroy()
output_fifo.destroy()
graph.destroy()
device.close()
device.destroy()
# display FPS information
print("[INFO] elapsed time: {:.2f}".format(fps.elapsed()))
print("[INFO] approx. FPS: {:.2f}".format(fps.fps()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment