Skip to content

Instantly share code, notes, and snippets.

@igrekun
Created July 9, 2017 20:19
Show Gist options
  • Save igrekun/35129c5a1d4f7ef2ffecdf0560adfc1f to your computer and use it in GitHub Desktop.
Save igrekun/35129c5a1d4f7ef2ffecdf0560adfc1f to your computer and use it in GitHub Desktop.
from .voc0712 import VOCDetection, AnnotationTransform, detection_collate, VOC_CLASSES
from .config import *
import cv2
import numpy as np
def base_transform(image, size, mean, std):
x = cv2.resize(image, (size, size)).astype(np.float32)
# x = cv2.resize(np.array(image), (size, size)).astype(np.float32)
x /= 255.0
x -= mean
x /= std # probs remove this
x = x.astype(np.float32)
return x
class BaseTransform:
def __init__(self, size, mean, std):
self.size = size
self.mean = np.array(mean, dtype=np.float32)
self.std = np.array(std, dtype=np.float32)
def __call__(self, image, boxes=None, labels=None):
return base_transform(image, self.size, self.mean, self.std), boxes, labels
import torch
from torchvision import transforms
import cv2
import numpy as np
import types
from numpy import random
def intersect(box_a, box_b):
max_xy = np.minimum(box_a[:, 2:], box_b[2:])
min_xy = np.maximum(box_a[:, :2], box_b[:2])
inter = np.clip((max_xy - min_xy), a_min=0, a_max=np.inf)
return inter[:, 0] * inter[:, 1]
def jaccard_numpy(box_a, box_b):
"""Compute the jaccard overlap of two sets of boxes. The jaccard overlap
is simply the intersection over union of two boxes.
E.g.:
A ∩ B / A ∪ B = A ∩ B / (area(A) + area(B) - A ∩ B)
Args:
box_a: Multiple bounding boxes, Shape: [num_boxes,4]
box_b: Single bounding box, Shape: [4]
Return:
jaccard overlap: Shape: [box_a.shape[0], box_a.shape[1]]
"""
inter = intersect(box_a, box_b)
area_a = ((box_a[:, 2]-box_a[:, 0]) *
(box_a[:, 3]-box_a[:, 1])) # [A,B]
area_b = ((box_b[2]-box_b[0]) *
(box_b[3]-box_b[1])) # [A,B]
union = area_a + area_b - inter
return inter / union # [A,B]
class Compose(object):
"""Composes several augmentations together.
Args:
transforms (List[Transform]): list of transforms to compose.
Example:
>>> augmentations.Compose([
>>> transforms.CenterCrop(10),
>>> transforms.ToTensor(),
>>> ])
"""
def __init__(self, transforms):
self.transforms = transforms
def __call__(self, img, boxes=None, labels=None):
for t in self.transforms:
img, boxes, labels = t(img, boxes, labels)
return img, boxes, labels
class Lambda(object):
"""Applies a lambda as a transform."""
def __init__(self, lambd):
assert isinstance(lambd, types.LambdaType)
self.lambd = lambd
def __call__(self, img, boxes=None, labels=None):
return self.lambd(img, boxes, labels)
class ConvertFromInts(object):
def __call__(self, image, boxes=None, labels=None):
return image.astype(np.float32), boxes, labels
class SubtractMeans(object):
def __init__(self, mean):
self.mean = np.array(mean, dtype=np.float32)
def __call__(self, image, boxes=None, labels=None):
image = image.astype(np.float32)
image -= self.mean
return image.astype(np.float32), boxes, labels
class ToUnitNorm(object):
def __call__(self, image, boxes=None, labels=None):
image = image.astype(np.float32)
image /= 255.0
return image.astype(np.float32), boxes, labels
class MeanNormalize(object):
def __init__(self, mean, std):
self.mean = np.array(mean, dtype=np.float32)
self.std = np.array(std, dtype=np.float32)
def __call__(self, image, boxes=None, labels=None):
image = image.astype(np.float32)
image -= self.mean
image /= self.std
return image.astype(np.float32), boxes, labels
class ToAbsoluteCoords(object):
def __call__(self, image, boxes=None, labels=None):
height, width, channels = image.shape
boxes[:, 0] *= width
boxes[:, 2] *= width
boxes[:, 1] *= height
boxes[:, 3] *= height
return image, boxes, labels
class ToPercentCoords(object):
def __call__(self, image, boxes=None, labels=None):
height, width, channels = image.shape
boxes[:, 0] /= width
boxes[:, 2] /= width
boxes[:, 1] /= height
boxes[:, 3] /= height
return image, boxes, labels
class Resize(object):
def __init__(self, size=300):
self.size = size
def __call__(self, image, boxes=None, labels=None):
image = cv2.resize(image, (self.size,
self.size))
return image, boxes, labels
class RandomSaturation(object):
def __init__(self, lower=0.5, upper=1.5):
self.lower = lower
self.upper = upper
assert self.upper >= self.lower, "contrast upper must be >= lower."
assert self.lower >= 0, "contrast lower must be non-negative."
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
image[:, :, 1] *= random.uniform(self.lower, self.upper)
return image, boxes, labels
class RandomHue(object):
def __init__(self, delta=18.0):
assert delta >= 0.0 and delta <= 360.0
self.delta = delta
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
image[:, :, 0] += random.uniform(-self.delta, self.delta)
image[:, :, 0][image[:, :, 0] > 360.0] -= 360.0
image[:, :, 0][image[:, :, 0] < 0.0] += 360.0
return image, boxes, labels
class RandomLightingNoise(object):
def __init__(self):
self.perms = ((0, 1, 2), (0, 2, 1),
(1, 0, 2), (1, 2, 0),
(2, 0, 1), (2, 1, 0))
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
swap = self.perms[random.randint(len(self.perms))]
shuffle = SwapChannels(swap) # shuffle channels
image = shuffle(image)
return image, boxes, labels
class ConvertColor(object):
def __init__(self, current='BGR', transform='HSV'):
self.transform = transform
self.current = current
def __call__(self, image, boxes=None, labels=None):
if self.current == 'BGR' and self.transform == 'HSV':
image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
elif self.current == 'HSV' and self.transform == 'BGR':
image = cv2.cvtColor(image, cv2.COLOR_HSV2BGR)
else:
raise NotImplementedError
return image, boxes, labels
class RandomContrast(object):
def __init__(self, lower=0.5, upper=1.5):
self.lower = lower
self.upper = upper
assert self.upper >= self.lower, "contrast upper must be >= lower."
assert self.lower >= 0, "contrast lower must be non-negative."
# expects float image
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
alpha = random.uniform(self.lower, self.upper)
image *= alpha
return image, boxes, labels
class RandomBrightness(object):
def __init__(self, delta=32):
assert delta >= 0.0
assert delta <= 255.0
self.delta = delta
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
delta = random.uniform(-self.delta, self.delta)
image += delta
return image, boxes, labels
class ToCV2Image(object):
def __call__(self, tensor, boxes=None, labels=None):
return tensor.cpu().numpy().astype(np.float32).transpose((1, 2, 0)), boxes, labels
class ToTensor(object):
def __call__(self, cvimage, boxes=None, labels=None):
return torch.from_numpy(cvimage.astype(np.float32)).permute(2, 0, 1), boxes, labels
class RandomSampleCrop(object):
"""Crop
Arguments:
img (Image): the image being input during training
boxes (Tensor): the original bounding boxes in pt form
labels (Tensor): the class labels for each bbox
mode (float tuple): the min and max jaccard overlaps
Return:
(img, boxes, classes)
img (Image): the cropped image
boxes (Tensor): the adjusted bounding boxes in pt form
labels (Tensor): the class labels for each bbox
"""
def __init__(self):
self.sample_options = (
# using entire original input image
None,
# sample a patch s.t. MIN jaccard w/ obj in .1,.3,.4,.7,.9
(0.1, None),
(0.3, None),
(0.7, None),
(0.9, None),
# randomly sample a patch
(None, None),
)
def __call__(self, image, boxes=None, labels=None):
height, width, _ = image.shape
while True:
# randomly choose a mode
mode = random.choice(self.sample_options)
if mode is None:
return image, boxes, labels
min_iou, max_iou = mode
if min_iou is None:
min_iou = float('-inf')
if max_iou is None:
max_iou = float('inf')
# max trails (50)
for _ in range(50):
current_image = image
w = random.uniform(0.3 * width, width)
h = random.uniform(0.3 * height, height)
# aspect ratio constraint b/t .5 & 2
if h / w < 0.5 or h / w > 2:
continue
left = random.uniform(width - w)
top = random.uniform(height - h)
# convert to integer rect x1,y1,x2,y2
rect = np.array([int(left), int(top), int(left+w), int(top+h)])
# calculate IoU (jaccard overlap) b/t the cropped and gt boxes
overlap = jaccard_numpy(boxes, rect)
# is min and max overlap constraint satisfied? if not try again
if overlap.min() < min_iou and max_iou < overlap.max():
continue
# cut the crop from the image
current_image = current_image[rect[1]:rect[3], rect[0]:rect[2],
:]
# keep overlap with gt box IF center in sampled patch
centers = (boxes[:, :2] + boxes[:, 2:]) / 2.0
# mask in all gt boxes that above and to the left of centers
m1 = (rect[0] < centers[:, 0]) * (rect[1] < centers[:, 1])
# mask in all gt boxes that under and to the right of centers
m2 = (rect[2] > centers[:, 0]) * (rect[3] > centers[:, 1])
# mask in that both m1 and m2 are true
mask = m1 * m2
# have any valid boxes? try again if not
if not mask.any():
continue
# take only matching gt boxes
current_boxes = boxes[mask, :].copy()
# take only matching gt labels
current_labels = labels[mask]
# should we use the box left and top corner or the crop's
current_boxes[:, :2] = np.maximum(current_boxes[:, :2],
rect[:2])
# adjust to crop (by substracting crop's left,top)
current_boxes[:, :2] -= rect[:2]
current_boxes[:, 2:] = np.minimum(current_boxes[:, 2:],
rect[2:])
# adjust to crop (by substracting crop's left,top)
current_boxes[:, 2:] -= rect[:2]
return current_image, current_boxes, current_labels
class Expand(object):
def __init__(self, mean):
self.mean = mean
def __call__(self, image, boxes, labels):
if random.randint(2):
return image, boxes, labels
height, width, depth = image.shape
ratio = random.uniform(1, 4)
left = random.uniform(0, width*ratio - width)
top = random.uniform(0, height*ratio - height)
expand_image = np.zeros(
(int(height*ratio), int(width*ratio), depth),
dtype=image.dtype)
expand_image[:, :, :] = self.mean
expand_image[int(top):int(top + height),
int(left):int(left + width)] = image
image = expand_image
boxes = boxes.copy()
boxes[:, :2] += (int(left), int(top))
boxes[:, 2:] += (int(left), int(top))
return image, boxes, labels
class RandomMirror(object):
def __call__(self, image, boxes, classes):
_, width, _ = image.shape
if random.randint(2):
image = image[:, ::-1]
boxes = boxes.copy()
boxes[:, 0::2] = width - boxes[:, 2::-2]
return image, boxes, classes
class SwapChannels(object):
"""Transforms a tensorized image by swapping the channels in the order
specified in the swap tuple.
Args:
swaps (int triple): final order of channels
eg: (2, 1, 0)
"""
def __init__(self, swaps):
self.swaps = swaps
def __call__(self, image):
"""
Args:
image (Tensor): image tensor to be transformed
Return:
a tensor with channels swapped according to swap
"""
# if torch.is_tensor(image):
# image = image.data.cpu().numpy()
# else:
# image = np.array(image)
image = image[:, :, self.swaps]
return image
class PhotometricDistort(object):
def __init__(self):
self.pd = [
RandomContrast(),
ConvertColor(transform='HSV'),
RandomSaturation(),
RandomHue(),
ConvertColor(current='HSV', transform='BGR'),
RandomContrast()
]
self.rand_brightness = RandomBrightness()
self.rand_light_noise = RandomLightingNoise()
def __call__(self, image, boxes, labels):
im = image.copy()
im, boxes, labels = self.rand_brightness(im, boxes, labels)
if random.randint(2):
distort = Compose(self.pd[:-1])
else:
distort = Compose(self.pd[1:])
im, boxes, labels = distort(im, boxes, labels)
return self.rand_light_noise(im, boxes, labels)
class SSDAugmentation(object):
def __init__(self, size=160, mean=(0.406, 0.456, 0.485), std=(0.225, 0.224, 0.229)):
self.mean = mean
self.std = std
self.size = size
self.augment = Compose([
ConvertFromInts(),
ToAbsoluteCoords(),
PhotometricDistort(),
Expand(self.mean),
RandomSampleCrop(),
RandomMirror(),
ToPercentCoords(),
Resize(self.size),
ToUnitNorm(),
MeanNormalize(self.mean, self.std)
])
def __call__(self, img, boxes, labels):
return self.augment(img, boxes, labels)
# config.py
import os.path
# gets home dir cross platform
home = os.path.expanduser("~")
#ddir = os.path.join(home,"data/HollywoodHeads/")
ddir = os.path.join(home,"data/VOCdevkit/")
# note: if you used our download scripts, this should be right
VOCroot = ddir # path to VOCdevkit root dir
# default batch size
BATCHES = 64
# data reshuffled at every epoch
SHUFFLE = True
# number of subprocesses to use for data loading
WORKERS = 4
#MBNET CONFIG
v3 = {
'feature_maps' : [20, 10, 5, 3, 1],
'min_dim' : 160,
'steps' : [8, 16, 32, 53, 160],
'min_sizes' : [16, 30, 60, 90, 130],
'max_sizes' : [30, 60, 90, 130, 170],
# 'aspect_ratios' : [[2, 1/2], [2, 1/2, 3, 1/3], [2, 1/2, 3, 1/3],
# [2, 1/2, 3, 1/3], [2, 1/2], [2, 1/2]],
'aspect_ratios' : [[2], [2, 3], [2, 3], [2, 3], [2]],
'variance' : [0.1, 0.2],
'clip' : True,
'name' : 'v2',
}
v2 = {
'feature_maps' : [38, 19, 10, 5, 3, 1],
'min_dim' : 300,
'steps' : [8, 16, 32, 64, 100, 300],
'min_sizes' : [30, 60, 111, 162, 213, 264],
'max_sizes' : [60, 111, 162, 213, 264, 315],
# 'aspect_ratios' : [[2, 1/2], [2, 1/2, 3, 1/3], [2, 1/2, 3, 1/3],
# [2, 1/2, 3, 1/3], [2, 1/2], [2, 1/2]],
'aspect_ratios' : [[2], [2, 3], [2, 3], [2, 3], [2], [2]],
'variance' : [0.1, 0.2],
'clip' : True,
'name' : 'v2',
}
# use average pooling layer as last layer before multibox layers
v1 = {
'feature_maps' : [38, 19, 10, 5, 3, 1],
'min_dim' : 300,
'steps' : [8, 16, 32, 64, 100, 300],
'min_sizes' : [30, 60, 114, 168, 222, 276],
'max_sizes' : [-1, 114, 168, 222, 276, 330],
# 'aspect_ratios' : [[2], [2, 3], [2, 3], [2, 3], [2, 3], [2, 3]],
'aspect_ratios' : [[1,1,2,1/2],[1,1,2,1/2,3,1/3],[1,1,2,1/2,3,1/3],
[1,1,2,1/2,3,1/3],[1,1,2,1/2,3,1/3],[1,1,2,1/2,3,1/3]],
'variance' : [0.1, 0.2],
'clip' : True,
'name' : 'v1',
}
"""Adapted from:
@longcw faster_rcnn_pytorch: https://github.com/longcw/faster_rcnn_pytorch
@rbgirshick py-faster-rcnn https://github.com/rbgirshick/py-faster-rcnn
Licensed under The MIT License [see LICENSE for details]
"""
from __future__ import print_function
import cv2
import torch
import torch.nn as nn
import torch.backends.cudnn as cudnn
import torchvision.transforms as transforms
from torch.autograd import Variable
from data import VOCroot
from data import VOC_CLASSES as labelmap
import torch.utils.data as data
from data import AnnotationTransform, VOCDetection, BaseTransform
from ssd import build_mbnet
import sys
import os
import time
import argparse
import numpy as np
import pickle
import cv2
if sys.version_info[0] == 2:
import xml.etree.cElementTree as ET
else:
import xml.etree.ElementTree as ET
def str2bool(v):
return v.lower() in ("yes", "true", "t", "1")
parser = argparse.ArgumentParser(description='Single Shot MultiBox Detection')
parser.add_argument('--trained_model', default='weights/ssd300_mAP_77.43_v2.pth',
type=str, help='Trained state_dict file path to open')
parser.add_argument('--save_folder', default='eval/', type=str,
help='File path to save results')
parser.add_argument('--confidence_threshold', default=0.01, type=float,
help='Detection confidence threshold')
parser.add_argument('--top_k', default=5, type=int,
help='Further restrict the number of predictions to parse')
parser.add_argument('--cuda', default=True, type=str2bool,
help='Use cuda to train model')
parser.add_argument('--voc_root', default=VOCroot, help='Location of VOC root directory')
args = parser.parse_args()
if not os.path.exists(args.save_folder):
os.mkdir(args.save_folder)
if args.cuda and torch.cuda.is_available():
torch.set_default_tensor_type('torch.cuda.FloatTensor')
else:
torch.set_default_tensor_type('torch.FloatTensor')
annopath = os.path.join(args.voc_root, 'VOC2007', 'Annotations', '%s.xml')
imgpath = os.path.join(args.voc_root, 'VOC2007', 'JPEGImages', '%s.jpg')
imgsetpath = os.path.join(args.voc_root, 'VOC2007', 'ImageSets', 'Main', '{:s}.txt')
YEAR = '2007'
devkit_path = VOCroot + 'VOC' + YEAR
dataset_mean = (0.406, 0.456, 0.485)
dataset_std = (0.225, 0.224, 0.229)
set_type = 'test'
class Timer(object):
"""A simple timer."""
def __init__(self):
self.total_time = 0.
self.calls = 0
self.start_time = 0.
self.diff = 0.
self.average_time = 0.
def tic(self):
# using time.time instead of time.clock because time time.clock
# does not normalize for multithreading
self.start_time = time.time()
def toc(self, average=True):
self.diff = time.time() - self.start_time
self.total_time += self.diff
self.calls += 1
self.average_time = self.total_time / self.calls
if average:
return self.average_time
else:
return self.diff
def parse_rec(filename):
""" Parse a PASCAL VOC xml file """
tree = ET.parse(filename)
objects = []
for obj in tree.findall('object'):
obj_struct = {}
obj_struct['name'] = obj.find('name').text
obj_struct['pose'] = obj.find('pose').text
obj_struct['truncated'] = int(obj.find('truncated').text)
obj_struct['difficult'] = int(obj.find('difficult').text)
bbox = obj.find('bndbox')
obj_struct['bbox'] = [int(bbox.find('xmin').text) - 1,
int(bbox.find('ymin').text) - 1,
int(bbox.find('xmax').text) - 1,
int(bbox.find('ymax').text) - 1]
objects.append(obj_struct)
return objects
def get_output_dir(name, phase):
"""Return the directory where experimental artifacts are placed.
If the directory does not exist, it is created.
A canonical path is built using the name from an imdb and a network
(if not None).
"""
filedir = os.path.join(name, phase)
if not os.path.exists(filedir):
os.makedirs(filedir)
return filedir
def get_voc_results_file_template(image_set, cls):
# VOCdevkit/VOC2007/results/det_test_aeroplane.txt
filename = 'det_' + image_set + '_%s.txt' % (cls)
filedir = os.path.join(devkit_path, 'results')
if not os.path.exists(filedir):
os.makedirs(filedir)
path = os.path.join(filedir, filename)
return path
def write_voc_results_file(all_boxes, dataset):
for cls_ind, cls in enumerate(labelmap):
print('Writing {:s} VOC results file'.format(cls))
filename = get_voc_results_file_template(set_type, cls)
with open(filename, 'wt') as f:
for im_ind, index in enumerate(dataset.ids):
dets = all_boxes[cls_ind+1][im_ind]
if dets == []:
continue
# the VOCdevkit expects 1-based indices
for k in range(dets.shape[0]):
f.write('{:s} {:.3f} {:.1f} {:.1f} {:.1f} {:.1f}\n'.
format(index[1], dets[k, -1],
dets[k, 0] + 1, dets[k, 1] + 1,
dets[k, 2] + 1, dets[k, 3] + 1))
def do_python_eval(output_dir='output', use_07=True):
cachedir = os.path.join(devkit_path, 'annotations_cache')
aps = []
# The PASCAL VOC metric changed in 2010
use_07_metric = use_07
print('VOC07 metric? ' + ('Yes' if use_07_metric else 'No'))
if not os.path.isdir(output_dir):
os.mkdir(output_dir)
for i, cls in enumerate(labelmap):
filename = get_voc_results_file_template(set_type, cls)
rec, prec, ap = voc_eval(
filename, annopath, imgsetpath.format(set_type), cls, cachedir,
ovthresh=0.5, use_07_metric=use_07_metric)
aps += [ap]
print('AP for {} = {:.4f}'.format(cls, ap))
with open(os.path.join(output_dir, cls + '_pr.pkl'), 'wb') as f:
pickle.dump({'rec': rec, 'prec': prec, 'ap': ap}, f)
print('Mean AP = {:.4f}'.format(np.mean(aps)))
print('~~~~~~~~')
print('Results:')
for ap in aps:
print('{:.3f}'.format(ap))
print('{:.3f}'.format(np.mean(aps)))
print('~~~~~~~~')
print('')
print('--------------------------------------------------------------')
print('Results computed with the **unofficial** Python eval code.')
print('Results should be very close to the official MATLAB eval code.')
print('--------------------------------------------------------------')
def voc_ap(rec, prec, use_07_metric=True):
""" ap = voc_ap(rec, prec, [use_07_metric])
Compute VOC AP given precision and recall.
If use_07_metric is true, uses the
VOC 07 11 point method (default:False).
"""
if use_07_metric:
# 11 point metric
ap = 0.
for t in np.arange(0., 1.1, 0.1):
if np.sum(rec >= t) == 0:
p = 0
else:
p = np.max(prec[rec >= t])
ap = ap + p / 11.
else:
# correct AP calculation
# first append sentinel values at the end
mrec = np.concatenate(([0.], rec, [1.]))
mpre = np.concatenate(([0.], prec, [0.]))
# compute the precision envelope
for i in range(mpre.size - 1, 0, -1):
mpre[i - 1] = np.maximum(mpre[i - 1], mpre[i])
# to calculate area under PR curve, look for points
# where X axis (recall) changes value
i = np.where(mrec[1:] != mrec[:-1])[0]
# and sum (\Delta recall) * prec
ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1])
return ap
def voc_eval(detpath,
annopath,
imagesetfile,
classname,
cachedir,
ovthresh=0.5,
use_07_metric=True):
"""rec, prec, ap = voc_eval(detpath,
annopath,
imagesetfile,
classname,
[ovthresh],
[use_07_metric])
Top level function that does the PASCAL VOC evaluation.
detpath: Path to detections
detpath.format(classname) should produce the detection results file.
annopath: Path to annotations
annopath.format(imagename) should be the xml annotations file.
imagesetfile: Text file containing the list of images, one image per line.
classname: Category name (duh)
cachedir: Directory for caching the annotations
[ovthresh]: Overlap threshold (default = 0.5)
[use_07_metric]: Whether to use VOC07's 11 point AP computation
(default False)
"""
# assumes detections are in detpath.format(classname)
# assumes annotations are in annopath.format(imagename)
# assumes imagesetfile is a text file with each line an image name
# cachedir caches the annotations in a pickle file
# first load gt
if not os.path.isdir(cachedir):
os.mkdir(cachedir)
cachefile = os.path.join(cachedir, 'annots.pkl')
# read list of images
with open(imagesetfile, 'r') as f:
lines = f.readlines()
imagenames = [x.strip() for x in lines]
if not os.path.isfile(cachefile):
# load annots
recs = {}
for i, imagename in enumerate(imagenames):
recs[imagename] = parse_rec(annopath % (imagename))
if i % 100 == 0:
print('Reading annotation for {:d}/{:d}'.format(
i + 1, len(imagenames)))
# save
print('Saving cached annotations to {:s}'.format(cachefile))
with open(cachefile, 'wb') as f:
pickle.dump(recs, f)
else:
# load
with open(cachefile, 'rb') as f:
recs = pickle.load(f)
# extract gt objects for this class
class_recs = {}
npos = 0
for imagename in imagenames:
R = [obj for obj in recs[imagename] if obj['name'] == classname]
bbox = np.array([x['bbox'] for x in R])
difficult = np.array([x['difficult'] for x in R]).astype(np.bool)
det = [False] * len(R)
npos = npos + sum(~difficult)
class_recs[imagename] = {'bbox': bbox,
'difficult': difficult,
'det': det}
# read dets
detfile = detpath.format(classname)
with open(detfile, 'r') as f:
lines = f.readlines()
if any(lines) == 1:
splitlines = [x.strip().split(' ') for x in lines]
image_ids = [x[0] for x in splitlines]
confidence = np.array([float(x[1]) for x in splitlines])
BB = np.array([[float(z) for z in x[2:]] for x in splitlines])
# sort by confidence
sorted_ind = np.argsort(-confidence)
sorted_scores = np.sort(-confidence)
BB = BB[sorted_ind, :]
image_ids = [image_ids[x] for x in sorted_ind]
# go down dets and mark TPs and FPs
nd = len(image_ids)
tp = np.zeros(nd)
fp = np.zeros(nd)
for d in range(nd):
R = class_recs[image_ids[d]]
bb = BB[d, :].astype(float)
ovmax = -np.inf
BBGT = R['bbox'].astype(float)
if BBGT.size > 0:
# compute overlaps
# intersection
ixmin = np.maximum(BBGT[:, 0], bb[0])
iymin = np.maximum(BBGT[:, 1], bb[1])
ixmax = np.minimum(BBGT[:, 2], bb[2])
iymax = np.minimum(BBGT[:, 3], bb[3])
iw = np.maximum(ixmax - ixmin, 0.)
ih = np.maximum(iymax - iymin, 0.)
inters = iw * ih
uni = ((bb[2] - bb[0]) * (bb[3] - bb[1]) +
(BBGT[:, 2] - BBGT[:, 0]) *
(BBGT[:, 3] - BBGT[:, 1]) - inters)
overlaps = inters / uni
ovmax = np.max(overlaps)
jmax = np.argmax(overlaps)
if ovmax > ovthresh:
if not R['difficult'][jmax]:
if not R['det'][jmax]:
tp[d] = 1.
R['det'][jmax] = 1
else:
fp[d] = 1.
else:
fp[d] = 1.
# compute precision recall
fp = np.cumsum(fp)
tp = np.cumsum(tp)
rec = tp / float(npos)
# avoid divide by zero in case the first detection matches a difficult
# ground truth
prec = tp / np.maximum(tp + fp, np.finfo(np.float64).eps)
ap = voc_ap(rec, prec, use_07_metric)
else:
rec = -1.
prec = -1.
ap = -1.
return rec, prec, ap
def test_net(save_folder, net, cuda, dataset, transform, top_k,
im_size=300, thresh=0.05):
"""Test a Fast R-CNN network on an image database."""
num_images = len(dataset)
# all detections are collected into:
# all_boxes[cls][image] = N x 5 array of detections in
# (x1, y1, x2, y2, score)
all_boxes = [[[] for _ in range(num_images)]
for _ in range(len(labelmap)+1)]
# timers
_t = {'im_detect': Timer(), 'misc': Timer()}
output_dir = get_output_dir('ssd300_120000', set_type)
det_file = os.path.join(output_dir, 'detections.pkl')
for i in range(num_images):
im, gt, h, w = dataset.pull_item(i)
x = Variable(im.unsqueeze(0))
if args.cuda:
x = x.cuda()
_t['im_detect'].tic()
detections = net(x).data
detect_time = _t['im_detect'].toc(average=False)
# skip j = 0, because it's the background class
for j in range(1, detections.size(1)):
dets = detections[0, j, :]
mask = dets[:, 0].gt(0.).expand(5, dets.size(0)).t()
dets = torch.masked_select(dets, mask).view(-1, 5)
if dets.dim() == 0:
continue
boxes = dets[:, 1:]
boxes[:, 0] *= w
boxes[:, 2] *= w
boxes[:, 1] *= h
boxes[:, 3] *= h
scores = dets[:, 0].cpu().numpy()
cls_dets = np.hstack((boxes.cpu().numpy(), scores[:, np.newaxis])) \
.astype(np.float32, copy=False)
all_boxes[j][i] = cls_dets
print('im_detect: {:d}/{:d} {:.3f}s'.format(i + 1,
num_images, detect_time))
with open(det_file, 'wb') as f:
pickle.dump(all_boxes, f, pickle.HIGHEST_PROTOCOL)
print('Evaluating detections')
evaluate_detections(all_boxes, output_dir, dataset)
def evaluate_detections(box_list, output_dir, dataset):
write_voc_results_file(box_list, dataset)
do_python_eval(output_dir)
if __name__ == '__main__':
# load net
net = build_mbnet('test', 160, 21) # initialize SSD
net.load_state_dict(torch.load(args.trained_model))
net.eval()
print('Finished loading model!')
# load data
dataset = VOCDetection(args.voc_root, [('2007', set_type)], BaseTransform(160, dataset_mean, dataset_std), AnnotationTransform())
if args.cuda:
net = net.cuda()
cudnn.benchmark = True
# evaluation
test_net(args.save_folder, net, args.cuda, dataset,
BaseTransform(net.size, dataset_mean, dataset_std), args.top_k, 160,
thresh=args.confidence_threshold)
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.nn.functional as F
from torch.autograd import Variable
from layers import *
from data import v3
import os
def weight_init(m):
if isinstance(m, nn.Conv2d):
init.xavier_uniform(m.weight.data)
class MBNet(nn.Module):
"""Single Shot Multibox Architecture
The network is composed of a base MBNet network followed by the
added multibox conv layers. Each multibox layer branches into
1) conv2d for class conf scores
2) conv2d for localization predictions
3) associated priorbox layer to produce default bounding
boxes specific to the layer's feature map size.
See: https://arxiv.org/pdf/1512.02325.pdf for more details.
Args:
phase: (string) Can be "test" or "train"
"""
def __init__(self, phase, num_classes):
super(MBNet, self).__init__()
self.phase = phase
self.num_classes = num_classes
# TODO: implement __call__ in PriorBox
self.priorbox = PriorBox(v3)
self.priors = Variable(self.priorbox.forward(), volatile=True)
self.size = 160
# SSD network
def conv_bn(inp, oup, stride):
return nn.Sequential(
nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
nn.BatchNorm2d(oup),
nn.ReLU(inplace=True)
)
def conv_dw(inp, oup, stride):
return nn.Sequential(
nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False),
nn.BatchNorm2d(inp),
nn.ReLU(inplace=True),
nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
nn.ReLU(inplace=True),
)
def conv_mb(inp, mid, oup, stride, pad=1):
return nn.Sequential(
nn.Conv2d(inp, mid, 1, 1, 0, bias=False),
nn.BatchNorm2d(mid),
nn.LeakyReLU(inplace=True),
nn.Conv2d(mid, oup, 3, stride, pad, bias=False),
nn.BatchNorm2d(oup),
nn.LeakyReLU(inplace=True),
)
self.head = nn.ModuleList([
conv_bn( 3, 32, 2),
conv_dw( 32, 64, 1),
conv_dw( 64, 128, 2),
conv_dw(128, 128, 1),
conv_dw(128, 256, 2),
conv_dw(256, 256, 1),
conv_dw(256, 512, 2),
conv_dw(512, 512, 1)
])
self.ssdconv1 = conv_mb(512, 128, 256, 2)
self.ssdconv2 = conv_mb(256, 128, 256, 1, 0)
self.ssdconv3 = conv_mb(256, 64, 128, 1, 0)
#xavier init
self.ssdconv1.apply(weight_init)
self.ssdconv2.apply(weight_init)
self.ssdconv3.apply(weight_init)
# Add localization and confidence lists
self.loc = nn.ModuleList([
nn.Conv2d(256, 4 * 4, kernel_size=3, padding=1),
nn.Conv2d(512, 6 * 4, kernel_size=3, padding=1),
nn.Conv2d(256, 6 * 4, kernel_size=3, padding=1),
nn.Conv2d(256, 6 * 4, kernel_size=3, padding=1),
nn.Conv2d(128, 4 * 4, kernel_size=3, padding=1)
])
self.conf = nn.ModuleList([
nn.Conv2d(256, 4 * num_classes, kernel_size=3, padding=1),
nn.Conv2d(512, 6 * num_classes, kernel_size=3, padding=1),
nn.Conv2d(256, 6 * num_classes, kernel_size=3, padding=1),
nn.Conv2d(256, 6 * num_classes, kernel_size=3, padding=1),
nn.Conv2d(128, 4 * num_classes, kernel_size=3, padding=1)
])
if phase == 'test':
self.softmax = nn.Softmax()
self.detect = Detect(num_classes, 0, 200, 0.01, 0.45)
def forward(self, x):
"""Applies network layers and ops on input image(s) x.
Args:
x: input image or batch of images. Shape: [batch,3*batch,300,300].
Return:
Depending on phase:
test:
Variable(tensor) of output class label predictions,
confidence score, and corresponding location predictions for
each object detected. Shape: [batch,topk,7]
train:
list of concat outputs from:
1: confidence layers, Shape: [batch*num_priors,num_classes]
2: localization layers, Shape: [batch,num_priors*4]
3: priorbox layers, Shape: [2,num_priors*4]
"""
sources = list()
loc = list()
conf = list()
for i in range(0, 6):
x = self.head[i](x)
sources.append(x)
for i in range(6, 8):
x = self.head[i](x)
sources.append(x)
x = self.ssdconv1(x)
sources.append(x)
x = self.ssdconv2(x)
sources.append(x)
x = self.ssdconv3(x)
sources.append(x)
# apply multibox head to source layers
for (x, l, c) in zip(sources, self.loc, self.conf):
loc.append(l(x).permute(0, 2, 3, 1).contiguous())
conf.append(c(x).permute(0, 2, 3, 1).contiguous())
loc = torch.cat([o.view(o.size(0), -1) for o in loc], 1)
conf = torch.cat([o.view(o.size(0), -1) for o in conf], 1)
if self.phase == "test":
output = self.detect(
loc.view(loc.size(0), -1, 4), # loc preds
self.softmax(conf.view(-1, self.num_classes)), # conf preds
self.priors # default boxes
)
else:
output = (
loc.view(loc.size(0), -1, 4),
conf.view(conf.size(0), -1, self.num_classes),
self.priors
)
return output
def load_weights(self, base_file):
other, ext = os.path.splitext(base_file)
if ext == '.pkl' or '.pth':
print('Loading weights into state dict...')
self.load_state_dict(torch.load(base_file, map_location=lambda storage, loc: storage))
print('Finished!')
else:
print('Sorry only .pth and .pkl files supported.')
def build_mbnet(phase, size=160, num_classes=2):
if phase != "test" and phase != "train":
print("Error: Phase not recognized")
return
if size != 160:
print("Error: Sorry only MBNet160 is supported currently!")
return
return MBNet(phase, num_classes)
from __future__ import print_function
import os
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
import torch.nn.init as init
import argparse
from torch.autograd import Variable
import torch.utils.data as data
from data import v3, AnnotationTransform, VOCDetection, detection_collate, VOCroot
from utils.augmentations import SSDAugmentation
from layers.modules import MultiBoxLoss
from ssd import build_mbnet
import numpy as np
import time
def str2bool(v):
return v.lower() in ("yes", "true", "t", "1")
parser = argparse.ArgumentParser(description='Single Shot MultiBox Detector Training')
parser.add_argument('--jaccard_threshold', default=0.5, type=float, help='Min Jaccard index for matching')
parser.add_argument('--batch_size', default=64, type=int, help='Batch size for training')
parser.add_argument('--resume', default=None, type=str, help='Resume from checkpoint')
parser.add_argument('--freeze', default=False, type=str2bool, help='Freeze pretrained subgraph')
parser.add_argument('--num_workers', default=4, type=int, help='Number of workers used in dataloading')
parser.add_argument('--iterations', default=60000, type=int, help='Number of training iterations')
parser.add_argument('--start_iter', default=0, type=int, help='Begin counting iterations starting from this value (should be used with resume)')
parser.add_argument('--cuda', default=True, type=str2bool, help='Use cuda to train model')
parser.add_argument('--lr', '--learning-rate', default=4e-3, type=float, help='initial learning rate')
parser.add_argument('--momentum', default=0.9, type=float, help='momentum')
parser.add_argument('--weight_decay', default=5e-4, type=float, help='Weight decay for SGD')
parser.add_argument('--gamma', default=0.1, type=float, help='Gamma update for SGD')
parser.add_argument('--log_iters', default=True, type=bool, help='Print the loss at each iteration')
parser.add_argument('--save_folder', default='weights/', help='Location to save checkpoint models')
parser.add_argument('--voc_root', default=VOCroot, help='Location of VOC root directory')
args = parser.parse_args()
if args.cuda and torch.cuda.is_available():
torch.set_default_tensor_type('torch.cuda.FloatTensor')
else:
torch.set_default_tensor_type('torch.FloatTensor')
cfg = v3
if not os.path.exists(args.save_folder):
os.mkdir(args.save_folder)
#train_sets = ['filtered']
train_sets = [('2007', 'trainval'), ('2012', 'trainval')]
ssd_dim = 160
means = (0.406, 0.456, 0.485) # only support voc now
stds = (0.225, 0.224, 0.229)
num_classes = 21
batch_size = args.batch_size
accum_batch_size = 128
iter_size = accum_batch_size / batch_size
# Backup default values
max_iter = 60000
weight_decay = 0.0005
stepvalues = (45000, 50000, 55000)
gamma = 0.1
momentum = 0.9
ssd_net = build_mbnet('train', ssd_dim, num_classes)
net = ssd_net
if args.cuda:
net = torch.nn.DataParallel(ssd_net)
if args.resume:
print('Resuming training, loading {}...'.format(args.resume))
ssd_net.load_weights(args.resume)
if not args.freeze:
print('Ensure all parameters are learnable...')
for param in ssd_net.parameters():
param.requires_grad = True
else:
print('Loading pretrained head...')
mbnet_head = torch.load('weights/mbnethead.pth')
ssd_net.head.load_state_dict(mbnet_head)
if args.freeze:
print('Freezing head subgraph...')
for param in ssd_net.head.parameters():
param.requires_grad = False
if args.cuda:
net.cuda()
cudnn.benchmark = True
optimizer = optim.SGD(filter(lambda p: p.requires_grad, net.parameters()), lr=args.lr,
momentum=args.momentum, weight_decay=args.weight_decay)
criterion = MultiBoxLoss(num_classes, 0.5, True, 0, True, 3, 0.5, False, args.cuda)
def train():
net.train()
# loss counters
loc_loss = 0 # epoch
conf_loss = 0
epoch = 0
print('Loading Dataset...')
dataset = VOCDetection(args.voc_root, train_sets, SSDAugmentation(
ssd_dim, means, stds), AnnotationTransform())
epoch_size = len(dataset) // args.batch_size
print('Training SSD on', dataset.name)
step_index = 0
batch_iterator = None
data_loader = data.DataLoader(dataset, batch_size, num_workers=args.num_workers,
shuffle=True, collate_fn=detection_collate)
for iteration in range(args.start_iter, max_iter):
if (not batch_iterator) or (iteration % epoch_size == 0):
# create batch iterator
batch_iterator = iter(data_loader)
if iteration in stepvalues:
step_index += 1
adjust_learning_rate(optimizer, args.gamma, step_index)
# reset epoch loss counters
loc_loss = 0
conf_loss = 0
epoch += 1
# load train data
images, targets = next(batch_iterator)
if args.cuda:
images = Variable(images.cuda())
targets = [Variable(anno.cuda()) for anno in targets]
else:
images = Variable(images)
targets = [Variable(anno) for anno in targets]
# forward
t0 = time.time()
out = net(images)
# backprop
optimizer.zero_grad()
loss_l, loss_c = criterion(out, targets)
loss = loss_l + loss_c
loss.backward()
optimizer.step()
t1 = time.time()
loc_loss += loss_l.data[0]
conf_loss += loss_c.data[0]
if iteration % 10 == 0:
print('Timer: %.4f sec.' % (t1 - t0))
print('iter ' + repr(iteration) + ' || Loss: %.4f ||' % (loss.data[0]), end=' ')
if iteration % 100 == 0:
print('Saving state, iter:', iteration)
torch.save(ssd_net.state_dict(), 'weights/mbnet_iter_' +
repr(iteration) + '.pth')
torch.save(ssd_net.state_dict(), args.save_folder + 'mbnet_final.pth')
def adjust_learning_rate(optimizer, gamma, step):
"""Sets the learning rate to the initial LR decayed by 10 at every specified step
# Adapted from PyTorch Imagenet example:
# https://github.com/pytorch/examples/blob/master/imagenet/main.py
"""
lr = args.lr * (gamma ** (step))
for param_group in optimizer.param_groups:
param_group['lr'] = lr
if __name__ == '__main__':
train()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment