Skip to content

Instantly share code, notes, and snippets.

@jkjung-avt
Forked from ck196/ssd_500_detect.py
Last active June 20, 2023 15:11
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jkjung-avt/605904dc05691e44a26bc57bb50d3f04 to your computer and use it in GitHub Desktop.
Save jkjung-avt/605904dc05691e44a26bc57bb50d3f04 to your computer and use it in GitHub Desktop.
Capture live video from camera and do Single-Shot Multibox Detector (SSD) object detetion in Caffe on Jetson TX2/TX1.
# --------------------------------------------------------
# Camera Single-Shot Multibox Detector (SSD) sample code
# for Tegra X2/X1
#
# This program captures and displays video from IP CAM,
# USB webcam, or the Tegra onboard camera, and do real-time
# object detection with Single-Shot Multibox Detector (SSD)
# in Caffe. Refer to the following blog post for how to set
# up and run the code:
#
# https://jkjung-avt.github.io/camera-ssd-threaded/
#
# Written by JK Jung <jkjung13@gmail.com>
# --------------------------------------------------------
import os
import sys
import time
import argparse
import threading
import subprocess
import numpy as np
import cv2
from google.protobuf import text_format
CAFFE_ROOT = '/home/nvidia/project/ssd-caffe/'
sys.path.insert(0, CAFFE_ROOT + 'python')
import caffe
from caffe.proto import caffe_pb2
DEFAULT_PROTOTXT = CAFFE_ROOT + 'models/VGGNet/coco/SSD_300x300/deploy.prototxt'
DEFAULT_MODEL = CAFFE_ROOT + 'models/VGGNet/coco/SSD_300x300/VGG_coco_SSD_300x300_iter_400000.caffemodel'
DEFAULT_LABELMAP = CAFFE_ROOT + 'data/coco/labelmap_coco.prototxt'
WINDOW_NAME = 'CameraSSDDemo'
BBOX_COLOR = (0, 255, 0) # green
PIXEL_MEANS = np.array([[[104.0, 117.0, 123.0]]], dtype=np.float32)
# The following 2 global variables are shared between threads
THREAD_RUNNING = False
IMG_HANDLE = None
def parse_args():
# Parse input arguments
desc = ('This script captures and displays live camera video, '
'and does real-time object detection with Single-Shot '
'Multibox Detector (SSD) in Caffe on Jetson TX2/TX1')
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--file', dest='use_file',
help='use a video file as input (remember to '
'also set --filename)',
action='store_true')
parser.add_argument('--filename', dest='filename',
help='video file name, e.g. test.mp4',
default=None, type=str)
parser.add_argument('--rtsp', dest='use_rtsp',
help='use IP CAM (remember to also set --uri)',
action='store_true')
parser.add_argument('--uri', dest='rtsp_uri',
help='RTSP URI, e.g. rtsp://192.168.1.64:554',
default=None, type=str)
parser.add_argument('--latency', dest='rtsp_latency',
help='latency in ms for RTSP [200]',
default=200, type=int)
parser.add_argument('--usb', dest='use_usb',
help='use USB webcam (remember to also set --vid)',
action='store_true')
parser.add_argument('--vid', dest='video_dev',
help='device # of USB webcam (/dev/video?) [1]',
default=1, type=int)
parser.add_argument('--width', dest='image_width',
help='image width [1280]',
default=1280, type=int)
parser.add_argument('--height', dest='image_height',
help='image height [720]',
default=720, type=int)
parser.add_argument('--cpu', dest='cpu_mode',
help='run Caffe in CPU mode (default: GPU mode)',
action='store_true')
parser.add_argument('--prototxt', dest='caffe_prototxt',
help='[{}]'.format(DEFAULT_PROTOTXT),
default=DEFAULT_PROTOTXT, type=str)
parser.add_argument('--model', dest='caffe_model',
help='[{}]'.format(DEFAULT_MODEL),
default=DEFAULT_MODEL, type=str)
parser.add_argument('--labelmap', dest='labelmap_file',
help='[{}]'.format(DEFAULT_LABELMAP),
default=DEFAULT_LABELMAP, type=str)
parser.add_argument('--confidence', dest='conf_th',
help='confidence threshold [0.3]',
default=0.3, type=float)
args = parser.parse_args()
return args
def open_cam_rtsp(uri, width, height, latency):
gst_str = ('rtspsrc location={} latency={} ! '
'rtph264depay ! h264parse ! omxh264dec ! '
'nvvidconv ! '
'video/x-raw, width=(int){}, height=(int){}, '
'format=(string)BGRx ! '
'videoconvert ! appsink').format(uri, latency, width, height)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def open_cam_usb(dev, width, height):
# We want to set width and height here, otherwise we could just do:
# return cv2.VideoCapture(dev)
gst_str = ('v4l2src device=/dev/video{} ! '
'video/x-raw, width=(int){}, height=(int){} ! '
'videoconvert ! appsink').format(dev, width, height)
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def open_cam_onboard(width, height):
gst_elements = str(subprocess.check_output('gst-inspect-1.0'))
if 'nvcamerasrc' in gst_elements:
# On versions of L4T prior to 28.1, add 'flip-method=2' into gst_str
gst_str = ('nvcamerasrc ! '
'video/x-raw(memory:NVMM), '
'width=(int)2592, height=(int)1458, '
'format=(string)I420, framerate=(fraction)30/1 ! '
'nvvidconv ! '
'video/x-raw, width=(int){}, height=(int){}, '
'format=(string)BGRx ! '
'videoconvert ! appsink').format(width, height)
elif 'nvarguscamerasrc' in gst_elements:
gst_str = ('nvarguscamerasrc ! '
'video/x-raw(memory:NVMM), '
'width=(int)1920, height=(int)1080, '
'format=(string)NV12, framerate=(fraction)30/1 ! '
'nvvidconv flip-method=2 ! '
'video/x-raw, width=(int){}, height=(int){}, '
'format=(string)BGRx ! '
'videoconvert ! appsink').format(width, height)
else:
raise RuntimeError('onboard camera source not found!')
return cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
def open_window(width, height):
cv2.namedWindow(WINDOW_NAME, cv2.WINDOW_NORMAL)
cv2.resizeWindow(WINDOW_NAME, width, height)
cv2.moveWindow(WINDOW_NAME, 0, 0)
cv2.setWindowTitle(WINDOW_NAME, 'Camera SSD Object Detection Demo '
'for Jetson TX2/TX1')
#
# This 'grab_img' function is designed to be run in the sub-thread.
# Once started, this thread continues to grab new image and put it
# into the global IMG_HANDLE, until THREAD_RUNNING is set to False.
#
def grab_img(cap):
global THREAD_RUNNING
global IMG_HANDLE
while THREAD_RUNNING:
_, IMG_HANDLE = cap.read()
if IMG_HANDLE is None:
print('grab_img(): cap.read() returns None...')
break
THREAD_RUNNING = False
def preprocess(src):
'''Preprocess the input image for SSD
'''
img = cv2.resize(src, (300, 300))
img = img.astype(np.float32) - PIXEL_MEANS
return img
def postprocess(img, out):
'''Postprocess the ouput of the SSD object detector
'''
h, w, c = img.shape
box = out['detection_out'][0,0,:,3:7] * np.array([w, h, w, h])
cls = out['detection_out'][0,0,:,1]
conf = out['detection_out'][0,0,:,2]
return (box.astype(np.int32), conf, cls)
def detect(origimg, net):
img = preprocess(origimg)
img = img.transpose((2, 0, 1))
tic = time.time()
net.blobs['data'].data[...] = img
out = net.forward()
dt = time.time() - tic
box, conf, cls = postprocess(origimg, out)
#print('Detection took {:.3f}s, found {} objects'.format(dt, len(box)))
print('Detection took {:.3f}s'.format(dt))
return (box, conf, cls)
def show_bounding_boxes(img, box, conf, cls, cls_dict, conf_th):
for bb, cf, cl in zip(box, conf, cls):
cl = int(cl)
# Only keep non-background bounding boxes with confidence value
# greater than threshold
if cl == 0 or cf < conf_th:
continue
x_min, y_min, x_max, y_max = bb[0], bb[1], bb[2], bb[3]
cv2.rectangle(img, (x_min,y_min), (x_max,y_max), BBOX_COLOR, 2)
txt_loc = (max(x_min, 5), max(y_min-3, 20))
cls_name = cls_dict.get(cl, 'CLASS{}'.format(cl))
txt = '{} {:.2f}'.format(cls_name, cf)
cv2.putText(img, txt, txt_loc, cv2.FONT_HERSHEY_DUPLEX, 0.8,
BBOX_COLOR, 1)
def read_cam_and_detect(net, cls_dict, conf_th):
global THREAD_RUNNING
global IMG_HANDLE
show_help = True
full_scrn = False
help_text = '"Esc" to Quit, "H" for Help, "F" to Toggle Fullscreen'
font = cv2.FONT_HERSHEY_PLAIN
while THREAD_RUNNING:
if cv2.getWindowProperty(WINDOW_NAME, 0) < 0:
# Check to see if the user has closed the window
# If yes, terminate the program
break
img = IMG_HANDLE
if img is not None:
box, conf, cls = detect(img, net)
show_bounding_boxes(img, box, conf, cls, cls_dict, conf_th)
if show_help:
cv2.putText(img, help_text, (11, 20), font, 1.0,
(32, 32, 32), 4, cv2.LINE_AA)
cv2.putText(img, help_text, (10, 20), font, 1.0,
(240, 240, 240), 1, cv2.LINE_AA)
cv2.imshow(WINDOW_NAME, img)
key = cv2.waitKey(1)
if key == 27: # ESC key: quit program
break
elif key == ord('H') or key == ord('h'): # Toggle help message
show_help = not show_help
elif key == ord('F') or key == ord('f'): # Toggle fullscreen
full_scrn = not full_scrn
if full_scrn:
cv2.setWindowProperty(WINDOW_NAME, cv2.WND_PROP_FULLSCREEN,
cv2.WINDOW_FULLSCREEN)
else:
cv2.setWindowProperty(WINDOW_NAME, cv2.WND_PROP_FULLSCREEN,
cv2.WINDOW_NORMAL)
def main():
global THREAD_RUNNING
args = parse_args()
print('Called with args:')
print(args)
if not os.path.isfile(args.caffe_prototxt):
sys.exit('File not found: {}'.format(args.caffe_prototxt))
if not os.path.isfile(args.caffe_model):
sys.exit('File not found: {}'.format(args.caffe_model))
if not os.path.isfile(args.labelmap_file):
sys.exit('File not found: {}'.format(args.labelmap_file))
# Initialize Caffe
if args.cpu_mode:
print('Running Caffe in CPU mode')
caffe.set_mode_cpu()
else:
print('Running Caffe in GPU mode')
caffe.set_device(0)
caffe.set_mode_gpu()
net = caffe.Net(args.caffe_prototxt, args.caffe_model, caffe.TEST)
# Build the class (index/name) dictionary from labelmap file
lm_handle = open(args.labelmap_file, 'r')
lm_map = caffe_pb2.LabelMap()
text_format.Merge(str(lm_handle.read()), lm_map)
cls_dict = {x.label:x.display_name for x in lm_map.item}
# Open camera
if args.use_file:
cap = cv2.VideoCapture(args.filename)
# ignore image width/height settings here
elif args.use_rtsp:
cap = open_cam_rtsp(args.rtsp_uri,
args.image_width,
args.image_height,
args.rtsp_latency)
elif args.use_usb:
cap = open_cam_usb(args.video_dev,
args.image_width,
args.image_height)
else: # By default, use the Jetson onboard camera
cap = open_cam_onboard(args.image_width,
args.image_height)
if not cap.isOpened():
sys.exit('Failed to open camera!')
# Start the sub-thread, which is responsible for grabbing images
THREAD_RUNNING = True
th = threading.Thread(target=grab_img, args=(cap,))
th.start()
# Grab image and do object detection (until stopped by user)
open_window(args.image_width, args.image_height)
read_cam_and_detect(net, cls_dict, args.conf_th)
# Terminate the sub-thread
THREAD_RUNNING = False
th.join()
cap.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
main()
import numpy as np
import matplotlib.pyplot as plt
#%matplotlib inline
import skimage
import skimage.io as skio
import os
from os import path
import warnings
warnings.simplefilter("ignore")
import time
import cv2
COLORS = ((0,0,0), (51, 51, 255), (255, 51, 51), (51, 255, 51), (255,255,0), (0,255,255), (0,127,255), (128,0,255), (102,102,255), (255,102,102), (102,255,102) )
plt.rcParams['figure.figsize'] = (10, 10)
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'
# Make sure that caffe is on the python path:
caffe_root = '.' # this file is expected to be in {caffe_root}/examples
import os
#os.chdir(caffe_root)
import sys
sys.path.insert(0, caffe_root + "/caffe/python")
from inspect import getmembers, isfunction
import caffe
caffe.set_device(0)
caffe.set_mode_gpu()
from google.protobuf import text_format
from caffe.proto import caffe_pb2 as cpb2
#print cpb2
# load PASCAL VOC labels
voc_labelmap_file = "data/VOC_toyota/labelmap_voc.prototxt"
file = open(voc_labelmap_file, 'r')
voc_labelmap = cpb2.LabelMap()
text_format.Merge(str(file.read()), voc_labelmap)
def get_labelname(labelmap, labels):
num_labels = len(labelmap.item)
labelnames = []
classindex = []
if type(labels) is not list:
labels = [labels]
for label in labels:
found = False
for i in xrange(0, num_labels):
if label == labelmap.item[i].label:
found = True
labelnames.append(labelmap.item[i].display_name)
classindex.append(labelmap.item[i].label)
break
assert found == True
return labelnames, classindex
model_def = 'deploy.prototxt'
model_weights = 'trained.caffemodel'
net = caffe.Net(model_def, # defines the structure of the model
model_weights, # contains the trained weights
1) # use test mode (e.g., don't perform dropout)
#caffe.TEST
# input preprocessing: 'data' is the name of the input blob == net.inputs[0]
transformer = caffe.io.Transformer({'data': net.blobs['data'].data.shape})
transformer.set_transpose('data', (2, 0, 1))
transformer.set_mean('data', np.array([104,117,123])) # mean pixel
transformer.set_raw_scale('data', 255) # the reference model operates on images in [0,255] range instead of [0,1]
transformer.set_channel_swap('data', (2,1,0)) # the reference model has channels in BGR order instead of RGB
# set net to batch size of 1
image_resize = 500
net.blobs['data'].reshape(1,3,image_resize,image_resize)
def predict(imgpath, outdir):
start_time = time.time()
imagename = imgpath.split('/')[-1]
image = cv2.imread(imgpath)
cpimg = image.copy()
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = skimage.img_as_float(image).astype(np.float32)
transformed_image = transformer.preprocess('data', image)
net.blobs['data'].data[...] = transformed_image
# Forward pass.
detections = net.forward()['detection_out']
# Parse the outputs.
det_label = detections[0,0,:,1]
det_conf = detections[0,0,:,2]
det_xmin = detections[0,0,:,3]
det_ymin = detections[0,0,:,4]
det_xmax = detections[0,0,:,5]
det_ymax = detections[0,0,:,6]
# Get detections with confidence higher than 0.6.
top_indices = [i for i, conf in enumerate(det_conf) if conf >= 0.7]
top_conf = det_conf[top_indices]
top_label_indices = det_label[top_indices].tolist()
top_labels, top_class_index = get_labelname(voc_labelmap, top_label_indices)
top_xmin = det_xmin[top_indices]
top_ymin = det_ymin[top_indices]
top_xmax = det_xmax[top_indices]
top_ymax = det_ymax[top_indices]
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k']
if top_conf.shape[0] > 0:
for i in xrange(top_conf.shape[0]):
xmin = int(round(top_xmin[i] * image.shape[1]))
ymin = int(round(top_ymin[i] * image.shape[0]))
xmax = int(round(top_xmax[i] * image.shape[1]))
ymax = int(round(top_ymax[i] * image.shape[0]))
score = top_conf[i]
label = top_labels[i]
color_index = top_class_index[i]
name = '%s: %.2f'%(label, score)
#if label != "sky" and label != "road":
cv2.rectangle(cpimg, (xmin, ymin), (xmax, ymax), COLORS[color_index], 2)
cv2.putText(cpimg, name, (xmin, ymin + 15), cv2.FONT_HERSHEY_DUPLEX, 0.5, COLORS[color_index] , 1)
output_img = path.join(outdir,imagename)
cv2.imwrite(output_img,cpimg)
else:
output_img = path.join(outdir ,imagename)
print output_img
cv2.imwrite(output_img,cpimg)
end_time = time.time()
exec_time = end_time - start_time
print 'Detect %s in %s seconds' % (imagename, exec_time)
if __name__ == '__main__':
inputlist = open('frames.txt','r')
lines = inputlist.readlines()
for line in lines:
line = line.replace('\n','')
predict(line, 'outdir')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment