Created
August 4, 2021 14:00
-
-
Save luator/14f84308258a988a74f11ab581d73aec to your computer and use it in GitHub Desktop.
View/convert TriFinger camera logs and visualise the dice goal pattern
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Play back TriCameraObjectObservations from a log file. | |
""" | |
import argparse | |
import json | |
import pathlib | |
import sys | |
import numpy as np | |
import cv2 | |
from trifinger_cameras import tricamera, utils | |
from trifinger_simulation.tasks import rearrange_dice | |
from trifinger_simulation.camera import load_camera_parameters | |
CAMERA_NAMES = ["camera60", "camera180", "camera300"] | |
class GoalDrawer: | |
"""Draw the goal pattern into given camera images.""" | |
def __init__( | |
self, | |
calib_file_dir: pathlib.Path, | |
goal: rearrange_dice.Goal, | |
alpha=0.7, | |
): | |
# load camera parameters | |
camera_params = load_camera_parameters( | |
calib_file_dir, "camera{id}.yml" | |
) | |
# generate goal masks for the given goal and cameras | |
goal_masks = rearrange_dice.generate_goal_mask(camera_params, goal) | |
self.masks = [self._prepare_mask(m, alpha) for m in goal_masks] | |
# prepare the green image for overlaying | |
mask_shape = self.masks[0].shape | |
self.full_green = np.zeros((mask_shape[0], mask_shape[1], 3)) | |
self.full_green[:, :, 1] = 255 | |
@staticmethod | |
def _prepare_mask(mask, alpha): | |
alpha_mask = np.array(mask * alpha, dtype=float) / 255.0 | |
alpha_mask = np.dstack([alpha_mask] * 3) | |
return alpha_mask | |
def draw(self, images): | |
"""Draws the goal masks into the given images. | |
The images are modified in-place! | |
""" | |
for image, mask in zip(images, self.masks): | |
masked = self.full_green * mask + image * (1.0 - mask) | |
image[:] = masked[:] | |
def load_goal(goal_file: pathlib.Path): | |
if not goal_file.exists(): | |
raise FileExistsError("{} does not exist.".format(goal_file)) | |
with open(goal_file, "r") as fh: | |
goal_dict = json.load(fh) | |
return goal_dict["goal"] | |
def main(): | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument( | |
"filename", | |
type=str, | |
help="Path to the log file.", | |
) | |
parser.add_argument( | |
"--visualize-goal", | |
"-g", | |
type=pathlib.Path, | |
metavar="GOAL_FILE", | |
help="Visualize goal from the specified JSON file.", | |
) | |
parser.add_argument( | |
"--overlay-alpha", | |
type=float, | |
default="0.6", | |
metavar="ALPHA", | |
help="""Transparency of the goal mask. Valid range is 0.0 to 1.0. | |
Only used if --visualize-goal/-g is set. | |
""", | |
) | |
parser.add_argument( | |
"--save-video", | |
type=str, | |
metavar="VIDEO_FILE", | |
help="""Save the images of the camera selected by --camera to a AVI | |
video file. Expects as argument the output path. | |
""", | |
) | |
parser.add_argument( | |
"--camera", | |
"-c", | |
choices=CAMERA_NAMES, | |
help="Name of the camera. Used by --save-video.", | |
) | |
args = parser.parse_args() | |
log_file_path = pathlib.Path(args.filename) | |
if not log_file_path.exists(): | |
print("{} does not exist.".format(log_file_path)) | |
sys.exit(1) | |
if args.visualize_goal: | |
try: | |
goal = load_goal(args.visualize_goal) | |
except Exception as e: | |
print("Failed to load goal: %s" % e, file=sys.stderr) | |
sys.exit(1) | |
goal_drawer = GoalDrawer( | |
log_file_path.parent, goal, args.overlay_alpha | |
) | |
log_reader = tricamera.LogReader(args.filename) | |
# determine rate based on time stamps | |
start_time = log_reader.data[0].cameras[0].timestamp | |
end_time = log_reader.data[-1].cameras[0].timestamp | |
interval = (end_time - start_time) / len(log_reader.data) | |
fps = 1 / interval | |
# convert to ms | |
interval = int(interval * 1000) | |
if args.save_video: | |
if not args.camera: | |
print("--camera is required for saving video.") | |
sys.exit(1) | |
camera_index = CAMERA_NAMES.index(args.camera) | |
first_img = utils.convert_image(log_reader.data[0].cameras[0].image) | |
fourcc = cv2.VideoWriter_fourcc(*"XVID") | |
video_writer = cv2.VideoWriter( | |
args.save_video, fourcc, fps, first_img.shape[:2] | |
) | |
print( | |
"Loaded {} frames at an average interval of {} ms ({:.1f} fps)".format( | |
len(log_reader.data), interval, fps | |
) | |
) | |
for observation in log_reader.data: | |
images = [ | |
utils.convert_image(camera.image) for camera in observation.cameras | |
] | |
if args.visualize_goal: | |
goal_drawer.draw(images) | |
if args.save_video: | |
video_writer.write(images[camera_index]) | |
else: | |
for i, name in enumerate(CAMERA_NAMES): | |
cv2.imshow(name, images[i]) | |
# stop if either "q" or ESC is pressed | |
if cv2.waitKey(interval) in [ord("q"), 27]: # 27 = ESC | |
break | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment