Last active
August 24, 2018 21:58
-
-
Save davegreenwood/0481266c409d307e80040687b81e0e45 to your computer and use it in GitHub Desktop.
module to calibrate stereo cameras
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" module to calibrate stereo cameras. """ | |
from __future__ import print_function | |
import numpy as np | |
import pickle | |
import cv2 | |
import logging | |
from zipfile import ZipFile | |
logger = logging.getLogger(__name__) | |
STEREO_FLAGS = cv2.CALIB_FIX_INTRINSIC | |
CALIB_FLAGS = cv2.CALIB_FIX_ASPECT_RATIO + \ | |
cv2.CALIB_FIX_PRINCIPAL_POINT + \ | |
cv2.CALIB_ZERO_TANGENT_DIST | |
CRITERIA = (cv2.TERM_CRITERIA_MAX_ITER + cv2.TERM_CRITERIA_EPS, 30, 1e-6) | |
BRD_SIZE = (3, 7) # or (5, 9) | |
IMG_SIZE = (720, 1280) | |
SQR_SIZE = 0.020 # or 0.015 metres | |
N_OBS = 50 # number of observations to retain when recalibrating | |
class ChessboardNotFoundError(Exception): | |
"""No chessboard could be found in searched image.""" | |
def zip_fnames(fname): | |
with ZipFile(fname, 'r') as archive: | |
flist = archive.namelist() | |
alist = [f for f in flist if 'cam_0' in f] | |
blist = [f for f in flist if 'cam_1' in f] | |
clist = [f for f in flist if 'cam_2' in f] | |
for a, b, c in zip(alist, blist, clist): | |
yield archive.read(a), archive.read(b), archive.read(c) | |
def calibrate_zip(fname, **kwargs): | |
size = kwargs.get('size', BRD_SIZE) | |
rows = size[0] | |
cols = size[1] | |
square_size = kwargs.get('square_size', SQR_SIZE) | |
objpoints, imgpoints = object_points(rows, cols, square_size), [] | |
for i, data in enumerate(zip_fnames(fname)): | |
images = [cv2.imdecode(np.frombuffer(x, np.uint8), 1) for x in data] | |
try: | |
corners = [find_corners(image, size=size) for image in images] | |
except ChessboardNotFoundError: | |
continue | |
imgpoints.append(corners) | |
logger.debug('appending {}th corners...'.format(i)) | |
return calibrate_N_cams(objpoints, imgpoints) | |
def calibrate_N(fnames_list, **kwargs): | |
""" fnames_list is list of lists of corresponding camera images: | |
[ [cam_0_frame_0.jpg, cam_0_frame_1.jpg ... cam_0_frame_N.jpg], ..., | |
[cam_N_frame_0.jpg, cam_N_frame_1.jpg ... cam_N_frame_N.jpg] ] | |
""" | |
disp = kwargs.get('disp', False) | |
size = kwargs.get('size', BRD_SIZE) | |
rows = size[0] | |
cols = size[1] | |
square_size = kwargs.get('square_size', SQR_SIZE) | |
objpoints, imgpoints = object_points(rows, cols, square_size), [] | |
logger.debug('values: {} {} {} {}'.format(disp, rows, cols, square_size)) | |
# find the corners | |
n = len(fnames_list) | |
logger.info('Attempting to find corners in {} images.'.format(n)) | |
logger.info('first images: \n{}'.format('\n'.join(fnames_list[0]))) | |
for i, fnames in enumerate(fnames_list): | |
images = [cv2.imread(fname) for fname in fnames] | |
try: | |
corners = [find_corners(image, size=size) for image in images] | |
except ChessboardNotFoundError: | |
# logger.debug('skipping {}th'.format(i)) | |
continue | |
disp_N(disp, images, corners, size=size) | |
logger.debug('appending {}th corners...'.format(i)) | |
imgpoints.append(corners) | |
cv2.destroyAllWindows() | |
return calibrate_N_cams(objpoints, imgpoints) | |
def calibrate_N_cams(objpoints, imgpoints, **kwargs): | |
image_size = kwargs.get('image_size', IMG_SIZE) | |
flags = kwargs.get('flags', CALIB_FLAGS) | |
criteria = kwargs.get('criteria', CRITERIA) | |
calibs = [] | |
# each camera | |
logger.info('image sets with found corners: {}'.format(len(imgpoints))) | |
logger.info('number of cameras: {}'.format(len(imgpoints[0]))) | |
for i, ips in enumerate(zip(*imgpoints)): | |
ops = np.array([objpoints] * len(ips)) | |
ips = np.array(ips) | |
calib = cv2.calibrateCamera( | |
ops, ips, image_size, np.eye(3), | |
np.zeros(5), flags=flags, criteria=criteria) | |
calibs.append(list(calib) + [ops, ips]) | |
logger.debug('Calibrating {}th camera'.format(i)) | |
logger.info('number of calibrations: {}'.format(len(calibs))) | |
return calibs | |
def projection_errors(calib): | |
op = calib[-2][0] | |
mtx = calib[1] | |
dist = calib[2] | |
error = [] | |
for rvec, tvec, ip in zip(calib[3], calib[4], calib[-1]): | |
pp = cv2.projectPoints(op, rvec, tvec, mtx, dist)[0] | |
p = (pp - ip).reshape(2, -1) | |
error.append(np.abs(p).max()) | |
return error | |
def ranked_idx(calibs): | |
errors = [] | |
for i, error in enumerate(zip( | |
*[projection_errors(calib) | |
for calib in calibs])): | |
errors.append((max(error), i)) | |
ranked_idx = np.array( | |
sorted([i[1] for i in sorted(errors)[:N_OBS]]), dtype='int32') | |
return ranked_idx | |
def re_calib_N_cams(calibs, **kwargs): | |
image_size = kwargs.get('image_size', IMG_SIZE) | |
flags = kwargs.get('flags', CALIB_FLAGS) | |
criteria = kwargs.get('criteria', CRITERIA) | |
sample_idx = ranked_idx(calibs) | |
result = [] | |
# calibs are returned from calibrate_N_cams | |
for calib in calibs: | |
ops = calib[-2][sample_idx] | |
ips = calib[-1][sample_idx] | |
c = cv2.calibrateCamera( | |
ops, ips, image_size, np.eye(3), np.zeros(5), | |
flags=flags, criteria=criteria) | |
result.append(list(c) + [ops, ips]) | |
return result | |
def disp_N(disp, images, corners, **kwargs): | |
size = kwargs.get('size', BRD_SIZE) | |
if not disp: | |
return | |
disp_image = np.hstack([cv2.drawChessboardCorners( | |
image, size, corner, True) | |
for image, corner in zip(images, corners)]) | |
cv2.imshow('name', disp_image) | |
cv2.waitKey(500) | |
cv2.destroyAllWindows() | |
def find_corners(image, **kwargs): | |
"""Find subpixel chessboard corners in opencv image.""" | |
size = kwargs.get('size', BRD_SIZE) | |
criteria = kwargs.get('criteria', CRITERIA) | |
grey = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
ret, corners = cv2.findChessboardCorners(grey, size, None) | |
if not ret: | |
raise ChessboardNotFoundError('No chessboard could be found') | |
corners = cv2.cornerSubPix( | |
grey, corners, (11, 11), (-1, -1), criteria=criteria) | |
if corners[-1, 0, -1] < corners[0, 0, -1]: | |
corners[:] = corners[::-1] | |
return corners | |
def object_points(rows, cols, square_size): | |
points = np.zeros((rows*cols, 3), np.float32) | |
points[:, :2] = np.mgrid[0:rows, 0:cols].T.reshape(-1, 2) * square_size | |
return points | |
def save_calib(fname, calib): | |
""" save the output of cv2.calibrateCamera() """ | |
d = dict() | |
d['rms'] = calib[0] | |
d['mtx'] = calib[1] | |
d['dist'] = calib[2] | |
d['rvecs'] = calib[3] | |
d['tvecs'] = calib[4] | |
d['objpoints'] = calib[5] | |
d['imgpoints'] = calib[6] | |
with open(fname, 'wb') as fid: | |
pickle.dump(d, fid, protocol=2) | |
def load_calib(fname): | |
with open(fname, 'rb') as fid: | |
d = pickle.load(fid) | |
mtx = np.array(d['mtx'], dtype='float32') | |
dist = np.array(d['dist'], dtype='float32') | |
rvecs = np.array(d['rvecs'], dtype='float32') | |
tvecs = np.array(d['tvecs'], dtype='float32') | |
objpoints = np.array(d['objpoints'], dtype='float32').squeeze() | |
imgpoints = np.array(d['imgpoints'], dtype='float32').squeeze() | |
return mtx, dist, rvecs, tvecs, objpoints, imgpoints | |
def calibrate_pair(left, right, **kwargs): | |
""" Calibrate a stereo pair of cameras. """ | |
logger.info('stereo calibration...') | |
flags = kwargs.get('flags', STEREO_FLAGS) | |
criteria = kwargs.get('criteria', CRITERIA) | |
img = kwargs.get('img', IMG_SIZE) | |
[L_m, L_d, _, _, objpoints, left_imgpoints] = left | |
[R_m, R_d, _, _, _, right_imgpoints] = right | |
try: | |
[rms, _, _, _, _, R, T, E, F] = cv2.stereoCalibrate( | |
objpoints, left_imgpoints, right_imgpoints, L_m, L_d, R_m, R_d, | |
img, flags=flags, criteria=criteria) | |
except Exception as e: | |
raise e | |
logger.info('Stereo RMS error: {}'.format(rms)) | |
return R, T, E, F |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment