Skip to content

Instantly share code, notes, and snippets.

@luator
Created August 4, 2021 14:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save luator/14f84308258a988a74f11ab581d73aec to your computer and use it in GitHub Desktop.
Save luator/14f84308258a988a74f11ab581d73aec to your computer and use it in GitHub Desktop.
View/convert TriFinger camera logs and visualise the dice goal pattern
#!/usr/bin/env python3
"""
Play back TriCameraObjectObservations from a log file.
"""
import argparse
import json
import pathlib
import sys
import numpy as np
import cv2
from trifinger_cameras import tricamera, utils
from trifinger_simulation.tasks import rearrange_dice
from trifinger_simulation.camera import load_camera_parameters
CAMERA_NAMES = ["camera60", "camera180", "camera300"]
class GoalDrawer:
"""Draw the goal pattern into given camera images."""
def __init__(
self,
calib_file_dir: pathlib.Path,
goal: rearrange_dice.Goal,
alpha=0.7,
):
# load camera parameters
camera_params = load_camera_parameters(
calib_file_dir, "camera{id}.yml"
)
# generate goal masks for the given goal and cameras
goal_masks = rearrange_dice.generate_goal_mask(camera_params, goal)
self.masks = [self._prepare_mask(m, alpha) for m in goal_masks]
# prepare the green image for overlaying
mask_shape = self.masks[0].shape
self.full_green = np.zeros((mask_shape[0], mask_shape[1], 3))
self.full_green[:, :, 1] = 255
@staticmethod
def _prepare_mask(mask, alpha):
alpha_mask = np.array(mask * alpha, dtype=float) / 255.0
alpha_mask = np.dstack([alpha_mask] * 3)
return alpha_mask
def draw(self, images):
"""Draws the goal masks into the given images.
The images are modified in-place!
"""
for image, mask in zip(images, self.masks):
masked = self.full_green * mask + image * (1.0 - mask)
image[:] = masked[:]
def load_goal(goal_file: pathlib.Path):
if not goal_file.exists():
raise FileExistsError("{} does not exist.".format(goal_file))
with open(goal_file, "r") as fh:
goal_dict = json.load(fh)
return goal_dict["goal"]
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"filename",
type=str,
help="Path to the log file.",
)
parser.add_argument(
"--visualize-goal",
"-g",
type=pathlib.Path,
metavar="GOAL_FILE",
help="Visualize goal from the specified JSON file.",
)
parser.add_argument(
"--overlay-alpha",
type=float,
default="0.6",
metavar="ALPHA",
help="""Transparency of the goal mask. Valid range is 0.0 to 1.0.
Only used if --visualize-goal/-g is set.
""",
)
parser.add_argument(
"--save-video",
type=str,
metavar="VIDEO_FILE",
help="""Save the images of the camera selected by --camera to a AVI
video file. Expects as argument the output path.
""",
)
parser.add_argument(
"--camera",
"-c",
choices=CAMERA_NAMES,
help="Name of the camera. Used by --save-video.",
)
args = parser.parse_args()
log_file_path = pathlib.Path(args.filename)
if not log_file_path.exists():
print("{} does not exist.".format(log_file_path))
sys.exit(1)
if args.visualize_goal:
try:
goal = load_goal(args.visualize_goal)
except Exception as e:
print("Failed to load goal: %s" % e, file=sys.stderr)
sys.exit(1)
goal_drawer = GoalDrawer(
log_file_path.parent, goal, args.overlay_alpha
)
log_reader = tricamera.LogReader(args.filename)
# determine rate based on time stamps
start_time = log_reader.data[0].cameras[0].timestamp
end_time = log_reader.data[-1].cameras[0].timestamp
interval = (end_time - start_time) / len(log_reader.data)
fps = 1 / interval
# convert to ms
interval = int(interval * 1000)
if args.save_video:
if not args.camera:
print("--camera is required for saving video.")
sys.exit(1)
camera_index = CAMERA_NAMES.index(args.camera)
first_img = utils.convert_image(log_reader.data[0].cameras[0].image)
fourcc = cv2.VideoWriter_fourcc(*"XVID")
video_writer = cv2.VideoWriter(
args.save_video, fourcc, fps, first_img.shape[:2]
)
print(
"Loaded {} frames at an average interval of {} ms ({:.1f} fps)".format(
len(log_reader.data), interval, fps
)
)
for observation in log_reader.data:
images = [
utils.convert_image(camera.image) for camera in observation.cameras
]
if args.visualize_goal:
goal_drawer.draw(images)
if args.save_video:
video_writer.write(images[camera_index])
else:
for i, name in enumerate(CAMERA_NAMES):
cv2.imshow(name, images[i])
# stop if either "q" or ESC is pressed
if cv2.waitKey(interval) in [ord("q"), 27]: # 27 = ESC
break
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment