Last active November 16, 2022 07:12
KITTI Pointcloud front-view projection
import multiprocessing
import os
import numpy as np
import array
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
import cv2
from scipy.interpolate import griddata, Rbf, interpolate
from PIL import Image
def color_map(pc, img_size):
:param pc: 输入点云
:param img_size: 图像大小
# 构建mask
mask = np.zeros([img_size[1], img_size[0]], dtype=np.uint8)
mask[np.int_(pc[:, 1]), np.int_(pc[:, 0])] = 255
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 7))
mask = cv2.dilate(mask, kernel, iterations=2)
# 泛洪算法 填补图像中的空洞
flood_fill = mask.copy().astype(np.uint8)
cv2.floodFill(flood_fill, np.zeros((img_size[1] + 2, img_size[0] + 2), np.uint8), (0, 0), 255)
mask = mask | cv2.bitwise_not(flood_fill)
# mask = cv2.bitwise_not(mask)
# contour, hier = cv2.findContours(mask, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE)
# for cnt in contour:
# cv2.drawContours(mask, [cnt], 0, 255, -1)
# mask = cv2.bitwise_not(mask)
# cv2.imshow('', mask)
# cv2.waitKey(0)
# colors = plt.get_cmap('gist_ncar_r')(np.maximum((100 - pc[:, 2]) / 100, 0))
colors = plt.get_cmap('hot')(1.5 * pc[:, 3] ** 1.3)
grid_x, grid_y = np.mgrid[0:img_size[0]:1, 0:img_size[1]:1]
chs = [griddata(pc[:, 0:2], colors[:, 2 - idx], (grid_x, grid_y), method='linear').T for idx in range(3)]
img = np.stack(chs, axis=-1)
img = np.int_(img * 255)
# 和mask向掩码
img = img * np.expand_dims(mask / 255, -1)
return img
def save_pc(points, path):
save pointcloud file (KITTI format)
:param points: pointcloud, numpy array of shape[N, 4], (x,y,z,intensity)
:param path: save path
points = points.reshape(-1)
with open(path, 'wb') as bin:
for data in points:
struct.pack('f', float(data))
def load_pc(bin_file_path):
load pointcloud file (KITTI format)
:param bin_file_path:
with open(bin_file_path, 'rb') as bin_file:
pc = array.array('f')
pc = np.array(pc).reshape(-1, 4)
return pc
def load_calib(calib_file_path):
load calibration file(KITTI object format)
:param calib_file_path:
calib_file = open(calib_file_path, 'r').readlines()
calib_file = [line
.replace('Tr_velo_to_cam', 'Tr_velo_cam')
.replace('Tr:', 'Tr_velo_cam:')
.replace('R0_rect', 'R_rect')
.replace('\n', '')
.replace(':', '')
.split(' ')
for line in calib_file]
calib_file = {line[0]: [float(item) for item in line[1:] if item != ''] for line in calib_file if len(line) > 1}
return calib_file
def parse_calib_file(calib_file):
parse calibration file to calibration matrix
:param calib_file:
# 外参矩阵
Tr_velo_cam = np.array(calib_file['Tr_velo_cam']).reshape(3, 4)
Tr_velo_cam = np.concatenate([Tr_velo_cam, [[0, 0, 0, 1]]], axis=0)
# 矫正矩阵
R_rect = np.array(calib_file['R_rect']).reshape(3, 3)
R_rect = np.pad(R_rect, [[0, 1], [0, 1]], mode='constant')
R_rect[-1, -1] = 1
# 内参矩阵
P2 = np.array(calib_file['P2']).reshape(3, 4)
return np.matmul(np.matmul(P2, R_rect), Tr_velo_cam)
def zbuffer_projection(pc, depth, data, img_size):
serial implement of zbuffer
:param pc: input pointcloud
:param img_size: image size
:return: rendered img
z_buffer = np.zeros(img_size[::-1]) + 1e9
img = np.zeros(img_size[::-1] + (3,))
proj_coord = np.int_(pc[:, :2])
for point_idx, coord in enumerate(proj_coord):
# 深度小于当前深度时才进行投影
if depth[point_idx] < z_buffer[coord[1], coord[0]]:
# 处理深度
z_buffer[coord[1], coord[0]] = depth[point_idx]
# 处理数据
# img[coord[1], coord[0], 0] = (pc[point_idx, 3] + 0.1) * 255
img[coord[1], coord[0], 1] = (np.maximum(0, MAX_DEPTH - pc[point_idx, 2]) / MAX_DEPTH) ** 1.1 * 255
img[coord[1], coord[0], 2] = (pc[point_idx, 5] + 126) % 255
return img
def project_lidar_to_image(pc, img_size, calib_file, yaw_deg, method='velodyne'):
:param pc: 输入点云(N, 4)
:param img_size: (w, h)
:param calib_file: KITTI calib文件的path
:param yaw_deg: 将点云按z轴旋转的角度,默认设置为0就可以
yaw_deg = yaw_deg / 180 * np.pi
calib_mat = parse_calib_file(load_calib(calib_file))
# 投影
intensity = np.copy(pc[:, 3]).reshape(-1, 1)
pc[:, 3] = 1
# yaw旋转
rotate_mat = np.array([
[np.cos(yaw_deg), -np.sin(yaw_deg), 0, 0],
[np.sin(yaw_deg), np.cos(yaw_deg), 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1],
pc = np.matmul(rotate_mat, pc.T).T
# 限制前视 并且限制fov在90度之内
# 计算fov
fov_h = np.arctan(np.abs(pc[:, 1] / pc[:, 0]))
fov_v = np.arctan(np.abs(pc[:, 2] / pc[:, 0]))
indice = np.where(np.bitwise_and(
pc[:, 0] > 0.5,
np.bitwise_and(fov_h < np.pi / 4, fov_v < np.pi / 10, )
pc = pc[indice]
intensity = intensity[indice]
# 还原pc
pc = np.concatenate([pc[:, :3], intensity], axis=1)
# 上采样点云
pc = upsample_ext.upsample(pc, num_threads=NUM_THREADS)
# pc = pc[np.where(pc[:, -1] == 128)]
# 备份其他特征
ground = np.copy(pc[:, 4]).reshape(-1, 1)
intensity = np.copy(pc[:, 3]).reshape(-1, 1)
height = np.copy(pc[:, 2]).reshape(-1, 1)
# 进行投影变换
pc = np.concatenate([pc[:, :3], np.ones_like(pc[:, 0]).reshape(-1, 1)], axis=1)
pc = np.matmul(calib_mat, pc.T).T
# z深度归一化
pc[:, :2] /= pc[:, 2:]
# 还原intensity
pc = np.concatenate([pc, intensity, height, ground], axis=1)
# 按照原图大小裁剪
pc = pc[np.where(pc[:, 0] >= 0)]
pc = pc[np.where(pc[:, 0] < img_size[0])]
pc = pc[np.where(pc[:, 1] >= 0)]
pc = pc[np.where(pc[:, 1] < img_size[1])]
if method == 'colormap':
# 方法1 对点云做color map之后再绘制到前视图上
# img = color_map(pc, img_size)
elif method == 'points':
# 方法2 下边是不对投影之后的点云做任何处理,直接以点的形式绘制到前视图上
img = np.zeros(img_size[::-1] + (3,))
img[np.int_(pc[:, 1]), np.int_(pc[:, 0]), 1] = (np.maximum(0, MAX_DEPTH - pc[:, 2]) / MAX_DEPTH) ** 1.1 # depth
# img[np.int_(pc[:, 1]), np.int_(pc[:, 0]), 1] = (pc[:, 3] + 0.1) # intensity
img[np.int_(pc[:, 1]), np.int_(pc[:, 0]), 2] = (pc[:, 5] + 126) % 255 / 255 # intensity
img = np.int_(img * 255)
elif method == 'interp':
# 方法3 interp
img = interp(pc, img_size)
elif method == 'zbuffer':
# 方法4 point-zbuffer投影
img = zbuffer_projection(
depth=pc[:, 2],
data=pc[:, 2:],
# img[:, :, 2] = zbuffer_projection(pc, img_size)
img = None
return img
def get_object_file_list(root):
获取kitti object目录下的所有文件
:param root:
file_list = []
for pc_file in os.listdir(root.format('velodyne')):
os.path.join(root.format('calib'), pc_file[:-4] + '.txt'),
os.path.join(root.format('velodyne'), pc_file),
os.path.join(root.format('image_2'), pc_file[:-4] + '.png'),
return file_list
def get_tracking_file_list(root):
获取kitti tracking目录格式下的所有文件
:param root:
# 提取tracking中的所有文件
pc_list = ['{}/{}'.format(path, file) for path in sorted(os.listdir(root.format('velodyne')))
for file in sorted(os.listdir(os.path.join(root.format('velodyne'), path)))]
file_list = []
for pc_file in pc_list:
os.path.join(root.format('calib'), pc_file[:4] + '.txt'),
os.path.join(root.format('velodyne'), pc_file),
os.path.join(root.format('image_02'), pc_file[:-4] + '.png'),
pc_file[:-4].replace('/', '_') # 文件id去掉目录分割符
return file_list
def process_task(calib_file_path, bin_file_path, img_path, file_id):
# 载入图像 点云
origin = cv2.imread(img_path)
img_size = origin.T.shape[1:]
pc = load_pc(bin_file_path)
# 投影
tic = time.time()
img = project_lidar_to_image(pc, img_size, calib_file_path, yaw_deg=yaw_deg, method='zbuffer')
toc = time.time()
print('{}: time used: {}s'.format(file_id, toc - tic))
# img = project_lidar_to_image(pc, img_size, calib_file_path, yaw_deg=yaw_deg, method='points')
# 裁剪到仅有lidar的部分
img = img[120:, ...]
# 保存结果
# cv2.imshow('', img.astype(np.uint8))
# cv2.waitKey(0)
os.path.join(OUTPUT_PATH, '{}.png'.format(file_id)),
if __name__ == '__main__':
# INPUT_PATH = '/home/bdbc201/dataset/KITTI/object/training/{}/'
# OUTPUT_PATH = '/home/bdbc201/dataset/cgan/mls/object_train'
# file_list = get_object_file_list(root=INPUT_PATH)
INPUT_PATH = '/home/bdbc201/dataset/KITTI/tracking/training/{}/'
OUTPUT_PATH = '/home/bdbc201/dataset/cgan/mls/tracking_train'
file_list = get_tracking_file_list(root=INPUT_PATH)
# object
yaw_deg = 0
pool = multiprocessing.Pool(32)
for file in sorted(file_list):
calib_file_path, bin_file_path, img_path, file_id = file
pool.apply_async(process_task, args=(calib_file_path, bin_file_path, img_path, file_id,))
# process_task(calib_file_path, bin_file_path, img_path, file_id)
