Skip to content

Instantly share code, notes, and snippets.

@yenchenlin
Created September 13, 2022 21:00
Show Gist options
  • Save yenchenlin/b1bf5f270c7c9e9cf58994629ee5979c to your computer and use it in GitHub Desktop.
Save yenchenlin/b1bf5f270c7c9e9cf58994629ee5979c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import os
import json
import numpy as np
import cv2
import argparse
from tqdm import tqdm
parser = argparse.ArgumentParser(description='Preprocess poses into instant-ngp format.')
parser.add_argument("--scene", help="path to the scene dataset", required=True)
parser.add_argument("--skip_training_frames", help="select 1 frame every N frames for training, the skipped frames become the test frames", type=int, default=4)
parser.add_argument("--skip_test_frames", help="select 1 frame every N test frames", type=int, default=16)
parser.add_argument("--method", help="The method used to decide block assignment.", type=str, default="linear", choices=['linear', 'kmeans'])
parser.add_argument("--n_training_frames", help="number of frames for training", type=int, default=None)
parser.add_argument("--n_blocks", help="number of blocks", type=int, default=8)
args = parser.parse_args()
# ## Automatic rescale & offset
#
# One thing that makes instant-ngp hard to use is determining the `scale` and `offset` manually.
# We use this script to automatically scale and translate an existing dataset.
# In[2]:
def closest_point_2_lines(oa, da, ob, db):
""" Returns point closest to both rays of form o+t*d, and a weight factor
that goes to 0 if the lines are parallel.
"""
da = da / np.linalg.norm(da)
db = db / np.linalg.norm(db)
c = np.cross(da, db)
denom = np.linalg.norm(c)**2
t = ob - oa
ta = np.linalg.det([t, db, c]) / (denom + 1e-10)
tb = np.linalg.det([t, da, c]) / (denom + 1e-10)
if ta > 0:
ta = 0
if tb > 0:
tb = 0
return (oa+ta*da+ob+tb*db) * 0.5, denom
# In[3]:
SRC_PATH = os.path.join(args.scene, f'./raw/transforms.json')
TRAINING_PATH = os.path.join(args.scene, f'./training_transforms.json')
TEST_PATH = os.path.join(args.scene, f'./test/test_transforms.json')
os.makedirs(os.path.join(args.scene, './test'), exist_ok=True)
with open(SRC_PATH) as f:
out = json.load(f)
training_frames = []
test_frames = []
for i, f in enumerate(out["frames"]):
if i % args.skip_training_frames == 0:
training_frames.append(
{
'file_path': f["file_path"],
'transform_matrix': np.array(f["transform_matrix"]),
}
)
else:
test_frames.append(
{
'file_path': os.path.join('../images', f["file_path"].split('/')[-1]),
'transform_matrix': np.array(f["transform_matrix"]),
}
)
if args.n_training_frames is not None and len(training_frames) == args.n_training_frames:
break
# Block assignment
if args.method == 'linear':
n_frames_per_block = len(training_frames) // args.n_blocks + 1
# Assign block_id to each frame.
for idx, frame in enumerate(training_frames):
frame['block_id'] = idx // n_frames_per_block
for idx, frame in enumerate(test_frames):
frame['block_id'] = (idx // (args.skip_training_frames-1)) // n_frames_per_block
elif args.method == 'kmeans':
from sklearn.cluster import KMeans
# Get translations.
ts = []
for frame in training_frames:
ts.append(frame['transform_matrix'][:3, -1])
# Fit KMeans.
kmeans = KMeans(n_clusters=args.n_blocks, random_state=0).fit(ts)
# Assign block_id predicted by kmeans to each frame.
for idx, frame in enumerate(training_frames):
frame['block_id'] = int(kmeans.labels_[idx])
for idx, frame in enumerate(test_frames):
pos = frame['transform_matrix'][:3, -1]
frame['block_id'] = int(kmeans.predict([pos])[0])
else:
raise NotImplementedError(f"Unsupport method {args.method}.")
# In[4]:
# Find a central point they are all looking at.
# If we have more than 1000 frames,
# subsample 1/4 of them to do the task otherwise it's too slow.
print("computing center of attention...")
n_frames = len(training_frames)
if n_frames > 1000:
print(f"too many frames ({n_frames}), subsampling {n_frames // 4} frames to compute the center of attention ...")
subsampled_frames = training_frames[::4]
totw = 0.0
totp = np.array([0.0, 0.0, 0.0])
for f in tqdm(subsampled_frames):
mf = f["transform_matrix"][0:3,:]
for g in subsampled_frames:
mg = g["transform_matrix"][0:3,:]
p, w = closest_point_2_lines(mf[:,3], mf[:,2], mg[:,3], mg[:,2])
if w > 0.01:
totp += p*w
totw += w
totp /= totw
print(totp) # the cameras are looking at totp
# Normalize both training and test frames and store them.
for split in ['training', 'test']:
if split == 'training':
out["frames"] = training_frames
output_path = TRAINING_PATH
elif split == 'test':
out['frames'] = test_frames[::args.skip_test_frames]
# Sort frames by their block_id
out['frames'] = sorted(out['frames'], key= lambda x: x['block_id'])
output_path = TEST_PATH
out["n_blocks"] = args.n_blocks
for f in out["frames"]:
f["transform_matrix"][0:3,3] -= totp
avglen = 0.
for f in out["frames"]:
avglen += np.linalg.norm(f["transform_matrix"][0:3,3])
nframes = len(out["frames"])
avglen /= nframes
print("avg camera distance from origin", avglen)
for f in out["frames"]:
f["transform_matrix"][0:3,3] *= 4.0 / avglen # scale to "nerf sized"
for f in out["frames"]:
f["transform_matrix"] = f["transform_matrix"].tolist()
print(nframes,"frames")
print(f"writing {split} data to {output_path}.")
with open(output_path, "w") as outfile:
json.dump(out, outfile, indent=2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment