|
""" |
|
COPYRIGHT NOTICE Deepen AI, Inc. 2017-2022 |
|
This software maybe subject to pending patent applications |
|
""" |
|
|
|
import sys |
|
import json |
|
import base64 |
|
import numpy as np |
|
import open3d as o3d |
|
import cv2 |
|
|
|
def decode_points_from_base64_encoded_string(json_data): |
|
intensities_arr = base64.b64decode(json_data['intensities']) |
|
intensities_arr = np.frombuffer(intensities_arr, dtype=np.float32) |
|
points = base64.b64decode(json_data["points"]) |
|
points_arr = np.frombuffer(points, dtype=np.float32) |
|
points_arr = points_arr.reshape(-1, 3) |
|
decoded_points = [] |
|
idx = 0 |
|
for point in points_arr: |
|
decoded_points.append({ |
|
'x': float(point[0]), |
|
'y': float(point[1]), |
|
'z': float(point[2]), |
|
'i': float(intensities_arr[idx]) |
|
}) |
|
idx = idx + 1 |
|
return decoded_points |
|
|
|
def read_json_file(file_path): |
|
with open(file_path, 'r') as json_file: |
|
json_data = json.load(json_file) |
|
return json_data |
|
|
|
def encode_to_color(value): |
|
rgb = rgb.copy() |
|
rgb.dtype = np.uint32 |
|
r = np.asarray((rgb >> 16) & 255, dtype=np.uint8) |
|
g = np.asarray((rgb >> 8) & 255, dtype=np.uint8) |
|
b = np.asarray(rgb & 255, dtype=np.uint8) |
|
rgb_arr = np.zeros((len(rgb), 3), dtype=np.uint8) |
|
rgb_arr[:, 0] = r |
|
rgb_arr[:, 1] = g |
|
rgb_arr[:, 2] = b |
|
return rgb_arr |
|
|
|
|
|
def json_to_pcd(json_data, color, is_fused_enabled): |
|
|
|
if json_data["points"].__class__ is not list: |
|
json_data['points'] = decode_points_from_base64_encoded_string(json_data) |
|
del json_data['intensities'] |
|
|
|
arr = [] |
|
|
|
for point in json_data['points']: |
|
pcd_point = [float(point['x']), float(point['y']), float(point['z'])] |
|
arr.append(pcd_point) |
|
points = np.array(arr).reshape(-1, 3) |
|
|
|
pcd = o3d.geometry.PointCloud() |
|
pcd.points = o3d.utility.Vector3dVector(points) |
|
pcd.paint_uniform_color([color, 0, 0]) |
|
if is_fused_enabled == False: |
|
if 'images' in json_data: |
|
for image_data in json_data['images']: |
|
image_file = image_data.get('image_url') |
|
camera_model = image_data.get('camera_model') |
|
is_fisheye = False |
|
if camera_model == 'fisheye': |
|
is_fisheye = True |
|
camera_matrix, distortions = get_camera_matrix_and_distortion_coefficient(image_data, is_fisheye) |
|
camera_to_world = create_extrinsic_matrix_from_device_data(image_data.get('heading'), |
|
image_data.get('position')) |
|
world_to_camera = np.linalg.inv(camera_to_world) |
|
projected_2d_points, projected_points_image = project_points(image_file, points, camera_matrix, |
|
world_to_camera) |
|
resize_image = lambda img: cv2.resize(img, (1920, 1080), interpolation=cv2.INTER_LINEAR) |
|
cv2.imshow('projected', resize_image(projected_points_image)) |
|
cv2.waitKey(0) |
|
return pcd |
|
|
|
def create_extrinsic_matrix_from_device_data(heading, position): |
|
x = float(heading['x']) |
|
y = float(heading['y']) |
|
z = float(heading['z']) |
|
w = float(heading['w']) |
|
|
|
return np.array([ |
|
[1 - 2 * (y ** 2) - 2 * (z ** 2), (2 * x * y) - (2 * w * z), (2 * x * z) + (2 * w * y), float(position['x'])], |
|
[(2 * x * y) + (2 * w * z), 1 - 2 * (x ** 2) - 2 * (z ** 2), (2 * y * z) - (2 * w * x), float(position['y'])], |
|
[(2 * x * z) - (2 * w * y), (2 * y * z) + (2 * w * x), 1 - 2 * (x ** 2) - 2 * (y ** 2), float(position['z'])], |
|
[0, 0, 0, 1] |
|
]) |
|
|
|
def project_points(image, points, camera_matrix, world_to_camera_transform): |
|
points_H = np.array([*points.T, [1] * len(points)], dtype=np.float32).T |
|
points_in_camera_frame_H = (world_to_camera_transform @ points_H.T).T |
|
points_in_camera_forward_H = points_in_camera_frame_H[points_in_camera_frame_H[:, 2] > 0] |
|
camera_matrix_4x4 = np.eye(4, 4) |
|
camera_matrix_4x4[:3, :3] = np.array(camera_matrix).reshape(3, 3) |
|
projected_points_on_image_lambda = (camera_matrix_4x4 @ points_in_camera_forward_H.T).T |
|
projected_points_on_image = (projected_points_on_image_lambda[:, :2] / projected_points_on_image_lambda[:, 2:3])[:, :2] |
|
image = cv2.imread(image, flags=cv2.IMREAD_COLOR) |
|
projected_points_image = image.copy() |
|
for point in projected_points_on_image: |
|
if int(point[0]) < image.shape[1] and int(point[1]) < image.shape[0] and int(point[0]) >= 0 and int(point[1]) >= 0: |
|
cv2.circle(projected_points_image, center=(int(point[0]), int(point[1])), radius=3, color=(255, 0, 0)) |
|
return projected_points_on_image, projected_points_image |
|
|
|
def get_camera_matrix_and_distortion_coefficient(intrinsic_params, is_fisheye): |
|
""" |
|
returns the camera_matrix in matrix form and distortion in array format. |
|
@param intrinsic_params: |
|
@param is_fisheye: |
|
@return: |
|
""" |
|
fx = intrinsic_params.get('fx', 1) |
|
fy = intrinsic_params.get('fy', 1) |
|
cx = intrinsic_params.get('cx', 0) |
|
cy = intrinsic_params.get('cy', 0) |
|
camera_matrix = np.array([ |
|
[fx, 0, cx], |
|
[0, fy, cy], |
|
[0, 0, 1] |
|
], dtype=np.float) |
|
distortion_enabled = intrinsic_params.get('distortion_enabled', True) |
|
distortion = None |
|
if distortion_enabled: |
|
k1 = intrinsic_params.get('k1', 0) |
|
k2 = intrinsic_params.get('k2', 0) |
|
p1 = intrinsic_params.get('p1', 0) |
|
p2 = intrinsic_params.get('p2', 0) |
|
k3 = intrinsic_params.get('k3', 0) |
|
k4 = intrinsic_params.get('k4', 0) |
|
if k4 is None: |
|
k4 = 0 |
|
k5 = intrinsic_params.get('k5', 0) |
|
k6 = intrinsic_params.get('k6', 0) |
|
|
|
if is_fisheye: |
|
if k1 != 0 or k2 != 0 or k3 != 0 or k4 != 0: |
|
distortion = np.array([[k1], [k2], [k3], [k4]], dtype=np.float) |
|
else: |
|
if k1 != 0 or k2 != 0 or k3 != 0 or p1 != 0 or p2 != 0: |
|
distortion = np.array([k1, k2, p1, p2, k3, k4, k5, k6], dtype=np.float) |
|
|
|
return camera_matrix, distortion |
|
|
|
if __name__ == '__main__': |
|
arg_len = len(sys.argv) |
|
files = sys.argv[2:] |
|
if not files: |
|
raise "No input json files provided " |
|
if sys.argv[1] == 'fused': |
|
pcds = [json_to_pcd(read_json_file(file_path=file_path), i / len(files), True) for i, file_path in enumerate(files)] |
|
elif sys.argv[1] == 'projection': |
|
pcds = [json_to_pcd(read_json_file(file_path=sys.argv[2]), 0, False)] |
|
else: |
|
raise "Please provide argument for 'fused' or 'projection' " |
|
|
|
origin = o3d.geometry.TriangleMesh.create_coordinate_frame(size=20, origin=[0, 0, 0]) |
|
o3d.visualization.draw_geometries([origin, *pcds]) |