Skip to content

Instantly share code, notes, and snippets.

@marc-tonsen
Last active February 14, 2022 08:35
Show Gist options
  • Save marc-tonsen/0bff75f776da16c594ced1eef9fdd397 to your computer and use it in GitHub Desktop.
Save marc-tonsen/0bff75f776da16c594ced1eef9fdd397 to your computer and use it in GitHub Desktop.
This code undistorts the scene video of a Pupil Invisible recording in raw binary format. The camera intrinisics used for the undistortion are downloaded from Pupil Cloud, so a connection to the internet is necessary.
import av
import cv2
import json
import struct
import pathlib
import argparse
import requests
import itertools
import numpy as np
from tqdm import tqdm
def fetch_intrinsics(serial_number):
"""
Using the serial number of the scene camera, fetch the camera matrix and
distortion coefficients.
You can find the serial number of the scene camera on the back of the scene
camera module, or in the info.json file of a recording. Note, that this is a
different serial number from the 'glasses serial number'.
"""
if serial_number == "default":
serial_number = "00000"
base_url = (
"https://pupil-invisible-hardware-calibrations" ".s3.eu-central-1.amazonaws.com"
)
version = "v1"
response = requests.get(base_url + "/" + version + "/" + serial_number)
response.raise_for_status()
binary_data = response.content
data = struct.unpack("<5s 9d 8d 9d", binary_data)
result = {
"serial_number": data[0].decode("utf-8"),
"camera_matrix": np.array(data[1:10], dtype=np.float64).reshape(3, 3),
"distortion_coefficients": np.array(data[10:18], dtype=np.float64).reshape(
1, 8
),
}
if result["serial_number"] != serial_number:
raise ValueError(
"The serial number returned from the cloud does not"
"match the queried serial number!"
)
return result["camera_matrix"], result["distortion_coefficients"]
def undistort_video(
original_video_path, undistorted_video_path, camera_matrix, dist_coefs
):
original_container = av.open(str(original_video_path))
original_video_stream = original_container.streams.video[0]
undistorted_container = av.open(str(undistorted_video_path), "w")
try:
undistorted_video = undistorted_container.add_stream("h264_nvenc")
except Exception as e:
print("nvenc not available", e)
undistorted_video = undistorted_container.add_stream("h264")
undistorted_video.options["bf"] = "0"
undistorted_video.options["movflags"] = "faststart"
undistorted_video.gop_size = original_video_stream.gop_size
undistorted_video.codec_context.height = original_video_stream.height
undistorted_video.codec_context.width = original_video_stream.width
undistorted_video.codec_context.time_base = original_video_stream.time_base
undistorted_video.codec_context.bit_rate = original_video_stream.bit_rate
if original_container.streams.audio:
audio_stream = original_container.streams.audio[0]
output_audio_stream = undistorted_container.add_stream("aac")
output_audio_stream.codec_context.layout = audio_stream.layout.name
output_audio_stream.codec_context.time_base = audio_stream.time_base
output_audio_stream.codec_context.extradata = audio_stream.extradata
output_audio_stream.codec_context.bit_rate = audio_stream.bit_rate
output_audio_stream.codec_context.sample_rate = audio_stream.sample_rate
progress = tqdm(unit=" frames")
with undistorted_container:
for packet in original_container.demux():
frames = packet.decode()
if packet.stream.type == "audio":
for frame in frames:
frame.pts = None
packets = output_audio_stream.encode(frame)
undistorted_container.mux(packets)
elif packet.stream.type == "video":
for frame in frames:
img = frame.to_ndarray(format="bgr24")
undistorted_img = cv2.undistort(img, camera_matrix, dist_coefs)
undistorted_img = img
new_frame = frame.from_ndarray(undistorted_img, format="bgr24")
new_frame.pts = frame.pts
packets = undistorted_video.encode(new_frame)
progress.update()
undistorted_container.mux(packets)
def undistort_recording(recording_or_video_path: pathlib.Path):
if (recording_or_video_path / "info.json").exists():
rec_path = recording_or_video_path
video_paths = [
path
for path in itertools.chain(
rec_path.glob("PI world v1 ps*.mp4"), rec_path.rglob("scene.mp4")
)
]
else:
rec_path = recording_or_video_path.parent
video_paths = [recording_or_video_path]
info_file = rec_path / "info.json"
camera_serial = json.load(info_file.open("rb"))["scene_camera_serial_number"]
camera_matrix, dist_coefs = fetch_intrinsics(camera_serial)
for video_path in video_paths:
undistorted_video_path = video_path.with_stem(f"undistorted-{video_path.stem}")
print(f"undistorting {video_path} to {undistorted_video_path}")
undistort_video(video_path, undistorted_video_path, camera_matrix, dist_coefs)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="undistort video")
parser.add_argument(
"recording_or_video_path", default=".", type=pathlib.Path, help=""
)
args = parser.parse_args()
undistort_recording(args.recording_or_video_path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment