Last active
January 6, 2025 02:33
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import glob | |
import numpy as np | |
import math | |
import torch | |
import warp as wp | |
import warp.sim | |
import warp.sim.render | |
import warp.optim | |
import matplotlib.pyplot as plt | |
import matplotlib.image as img | |
from PIL import Image | |
from diffray import Diffray | |
os.environ['PYGLET_HEADLESS'] = 'true' | |
@wp.kernel | |
def calculate_and_assign_material_params( | |
log_E: wp.array(dtype=wp.float32), | |
nu: wp.array(dtype=wp.float32), | |
tet_materials: wp.array2d(dtype=wp.float32), | |
param_min_E: float, | |
param_max_E: float, | |
param_min_nu: float, | |
param_max_nu: float, | |
): | |
tid = wp.tid() | |
safe_log_E = wp.clamp(log_E[0], wp.log10(param_min_E), wp.log10(param_max_E)) | |
safe_nu = wp.clamp(nu[0], param_min_nu, param_max_nu) # Avoid numerical instability | |
k_mu = wp.pow(10.0, safe_log_E) / (2.0 * (1.0 + safe_nu)) # ⚠️ Can be unstable near nu = 0.5 | |
k_lambda = wp.pow(10.0, safe_log_E) * safe_nu / ((1.0 + safe_nu) * (1.0 - 2.0 * safe_nu)) | |
tet_materials[tid, 0] = k_mu | |
tet_materials[tid, 1] = k_lambda | |
@wp.kernel | |
def loss_kernel( | |
particle_q: wp.array(dtype=wp.vec3), | |
target_q: wp.array(dtype=wp.vec3), | |
loss: wp.array(dtype=float) | |
): | |
tid = wp.tid() | |
wp.atomic_add(loss, 0, wp.length_sq(particle_q[tid] - target_q[tid])) | |
@wp.kernel | |
def image_loss_kernel( | |
predicted_frame: wp.array(dtype=wp.vec3), #wp.array3d(dtype=wp.float32), | |
target_frame: wp.array(dtype=wp.vec3), | |
loss: wp.array(dtype=float), | |
num_elements: float | |
): | |
tid = wp.tid() | |
pred_val = predicted_frame[tid] | |
target_val = target_frame[tid] | |
diff = pred_val - target_val | |
wp.atomic_add(loss, 0, (diff[0]*diff[0] + diff[1]*diff[1] + diff[2]*diff[2]) / num_elements) | |
@wp.kernel | |
def multiply_kernel( | |
loss: wp.array(dtype=float), | |
scale: float, | |
): | |
loss[0] = loss[0] * scale | |
@wp.kernel | |
def step_kernel( | |
x: wp.array(dtype=wp.float32), | |
grad: wp.array(dtype=wp.float32), | |
learning_rate: float | |
): | |
new_x = x[0] - learning_rate * grad[0] | |
x[0] = new_x | |
@wp.kernel | |
def constraint( | |
x: wp.array(dtype=wp.float32), | |
bound_min: float, | |
bound_max: float | |
): | |
tid = wp.tid() | |
x[tid] = wp.clamp(x[tid], bound_min, bound_max) | |
# r = bound_max - bound_min | |
# y_scale = r / 2.0 | |
# x_scale = 2.0 / r | |
# res = y_scale * wp.tanh(x_scale * x[tid]) + (bound_min + y_scale) | |
# x[tid] = res | |
@wp.kernel | |
def eval_external_particle_forces( | |
input_forces: wp.array(dtype=wp.vec3), | |
particle_f: wp.array(dtype=wp.vec3)): | |
tid = wp.tid() | |
wp.atomic_add(particle_f, tid, input_forces[tid]) | |
def load_frames_to_warp_array(target_output_path, channels=3): | |
frame_files = sorted(glob.glob(f"{target_output_path}/frames/*.png")) | |
if not frame_files: | |
raise ValueError(f"No frames found in {target_output_path}/frames/") | |
sample_img = Image.open(frame_files[0]) | |
h, w = np.array(sample_img).shape[:2] | |
frames = [] | |
for i, frame_file in enumerate(frame_files): | |
if channels == 3: | |
img = Image.open(frame_file).convert('RGB') | |
frames.append(wp.array(np.array(img).reshape(-1, 3) / 255.0, dtype=wp.vec3, requires_grad=False)) | |
else: | |
img = Image.open(frame_file).convert('L') | |
frames.append(wp.array(np.array(img).reshape(-1, 1) / 255.0, dtype=wp.float32, requires_grad=False)) | |
return frames | |
class Simulation: | |
def __init__(self, | |
base_path="output", | |
target_output_path=None, | |
target_states=None, | |
isDepth=False, | |
num_frames=300, | |
train_E=True, | |
train_nu=False, | |
E=1e5, | |
nu=0.1, | |
verbose=False, | |
train_iters=1 | |
): | |
self.train = train_E | |
self.base_path = base_path | |
self.device = wp.get_device() | |
self.verbose = verbose | |
self.sim_substeps = 128 | |
self.num_frames = num_frames | |
fps = 60 | |
sim_duration = self.num_frames / fps | |
self.frame_dt = 1.0 / fps | |
self.sim_dt = self.frame_dt / self.sim_substeps | |
self.sim_time = 0.0 | |
self.render_time = 0.0 | |
self.scale = 100.0 | |
self.force_magnitude = 100.0 | |
self.iter = 0 | |
self.train_rate = 1e-1 | |
self.initial_E = E | |
self.initial_nu = nu | |
k_mu = E / (2 * (1 + nu)) | |
k_lambda = (E * nu) / ((1 + nu) * (1 - 2 * nu)) | |
# Differentiable parameters | |
self.log_E = wp.array([wp.log10(self.initial_E)], dtype=wp.float32, requires_grad=self.train) | |
self.nu = wp.array([self.initial_nu], dtype=wp.float32, requires_grad=self.train) | |
self.param_bounds = [ | |
(1e2, 1e7), # E bounds (Pa) | |
(0.01, 0.49), # nu bounds (Poisson ratio) | |
(1e-6, 10.0) # k_damp bounds | |
] | |
builder = wp.sim.ModelBuilder() | |
builder.default_particle_radius = 0.01 | |
cell_dim = 4 | |
cell_size = (1.0 / cell_dim) | |
center = cell_size * cell_dim * 0.5 | |
density = 1.0 | |
builder.add_soft_grid( | |
pos=wp.vec3(-center, 0.0, -center), | |
rot=wp.quat_identity(), | |
vel=wp.vec3(0.0, 0.0, 0.0), | |
dim_x=cell_dim, | |
dim_y=cell_dim, | |
dim_z=cell_dim, | |
cell_x=cell_size, | |
cell_y=cell_size, | |
cell_z=cell_size, | |
density=density, | |
fix_left=True, | |
# doesnt matter what these are because we will assign them later | |
k_mu=k_mu, | |
k_lambda=k_lambda, | |
k_damp=0.0, | |
) | |
self.model = builder.finalize(requires_grad=self.train) | |
self.model.tet_materials.requires_grad = self.train | |
self.model.gravity[1] = 0.0 | |
self.model.ground = False | |
self.control = self.model.control() | |
self.integrator = wp.sim.SemiImplicitIntegrator() | |
self.states = [self.model.state(requires_grad=self.train) for _ in range(self.num_frames * self.sim_substeps + 1)] | |
self.loss = wp.zeros(1, dtype=float, requires_grad=self.train) | |
self.train_E = train_E | |
self.train_nu = train_nu | |
if self.train_E: | |
# self.optimizer = wp.optim.Adam([self.log_E.flatten()], lr=self.train_rate) | |
self.optimizer = torch.optim.Adam([{"params": wp.to_torch(self.log_E.flatten()), "lr": self.train_rate}]) | |
self.lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR( | |
optimizer=self.optimizer, T_max=train_iters | |
) | |
elif self.train_nu: | |
self.optimizer = wp.optim.Adam([self.nu.flatten()], lr=self.train_rate) | |
else: | |
self.optimizer = wp.optim.Adam([self.log_E.flatten(), self.nu.flatten()], lr=self.train_rate) | |
# self.optimizer = torch.optim.Adam([{"params": wp.to_torch(self.log_E), "lr": 1e-1}, {"params": wp.to_torch(self.nu), "lr": 1e-2}]) | |
self.output_path, self.frames_path = self._init_output_folders() | |
self.channels = 1 if isDepth else 3 | |
self._init_renderer() | |
self.target_frames = None | |
self.target_states = [self.model.state(requires_grad=self.train) for _ in range(self.num_frames * self.sim_substeps + 1)] | |
if target_output_path is not None: | |
self.target_frames = load_frames_to_warp_array(target_output_path, channels=self.channels) | |
if target_states is not None: | |
self.target_states = target_states | |
self.particle_extforce_idx = self.find_top_surface_point() | |
self.particle_activations = np.zeros((self.model.particle_count, 3)) | |
self.particle_activations[self.particle_extforce_idx] = np.array([0, -self.force_magnitude, 0]) | |
self.particle_activations = wp.from_numpy(self.particle_activations, dtype=wp.vec3) | |
# TODO: fix error in graph capture. CUDA 700 illegal memory | |
self.use_cuda_graph = False # self.train and wp.get_device().is_cuda | |
if self.use_cuda_graph: | |
with wp.ScopedCapture() as capture: | |
self.tape = wp.Tape() | |
with self.tape: | |
self.forward() | |
self.tape.backward(self.loss) | |
self.graph = capture.graph | |
self.loss_history = [] | |
self.E_history = [] | |
self.nu_history = [] | |
def _init_renderer(self): | |
self.width = 1820 | |
self.height = 1024 | |
self.renderer = Diffray( | |
model=self.model, | |
scaling=1.0, | |
camera_pos=(2.0, 2.0, 2.0), | |
camera_front=(-1.0, -1.0, -1.0), | |
camera_up=(0.0, 1.0, 0.0), | |
camera_fov=60.0, | |
width=self.width, | |
height=self.height, | |
num_frames=self.num_frames, | |
train=self.train | |
) | |
self.num_elements = float(self.width * self.height * self.channels * self.num_frames) | |
self.pred_frames = [wp.zeros(self.height * self.width, dtype=wp.vec3, requires_grad=self.train) for _ in range(self.num_frames)] | |
def _init_output_folders(self): | |
"""Create organized output directory structure.""" | |
experiment_dir = f"E{self.initial_E}" | |
output_path = os.path.join(self.base_path, experiment_dir) | |
os.makedirs(output_path, exist_ok=True) | |
frames_path = os.path.join(output_path, "frames") | |
os.makedirs(frames_path, exist_ok=True) | |
return output_path, frames_path | |
def find_top_surface_point(self): | |
particle_q = self.model.particle_q.numpy() | |
tri_indices = self.model.tri_indices.numpy() | |
surface_vertex_indices = np.unique(tri_indices.flatten()) | |
surface_points = particle_q[surface_vertex_indices] | |
max_y = np.max(surface_points[:, 1]) # Assuming y-axis is the height | |
top_surface_mask = surface_points[:, 1] == max_y | |
top_surface_points = surface_points[top_surface_mask] | |
top_surface_center = np.mean(top_surface_points, axis=0) | |
distances = np.linalg.norm(top_surface_points - top_surface_center, axis=1) | |
closest_point_idx = np.argmin(distances) | |
closest_particle_idx = surface_vertex_indices[top_surface_mask][closest_point_idx] | |
max_x = np.max(surface_points[:, 0]) | |
right_surface_mask = surface_points[:, 0] == max_x | |
right_surface_points = surface_points[right_surface_mask] | |
top_center_distance = np.linalg.norm(right_surface_points - top_surface_center, axis=1) | |
closest_point_idx = np.argmin(top_center_distance) | |
closest_particle_idx = surface_vertex_indices[right_surface_mask][closest_point_idx] | |
return closest_particle_idx | |
def forward(self): | |
"""Forward pass of the simulation""" | |
wp.launch( | |
calculate_and_assign_material_params, | |
dim=self.model.tet_materials.shape[0], | |
inputs=[self.log_E, self.nu, self.model.tet_materials, self.param_bounds[0][0], self.param_bounds[0][1], self.param_bounds[1][0], self.param_bounds[1][1]] | |
) | |
self.render_time = 0.0 | |
for frame in range(self.num_frames): | |
with wp.ScopedTimer("simulate"): | |
for i in range(self.sim_substeps): | |
self.states[frame * self.sim_substeps + i].clear_forces() | |
wp.launch( | |
kernel=eval_external_particle_forces, | |
dim=self.model.particle_count, | |
inputs=[self.particle_activations], | |
outputs=[self.states[frame * self.sim_substeps + i].particle_f] | |
) | |
self.integrator.simulate( | |
self.model, | |
self.states[frame * self.sim_substeps + i], | |
self.states[frame * self.sim_substeps + i + 1], | |
self.sim_dt | |
) | |
self.sim_time += self.sim_dt | |
with wp.ScopedTimer("render"): | |
self.renderer.ray_cast(self.states[(frame + 1) * self.sim_substeps].particle_q, self.pred_frames[frame], frame) | |
if self.train: | |
with wp.ScopedTimer("loss", active=self.train): | |
for frame in range(self.num_frames): | |
wp.launch( | |
image_loss_kernel, | |
dim=(self.width * self.height), | |
inputs=[ | |
self.pred_frames[frame], | |
self.target_frames[frame], | |
self.loss, | |
self.num_elements | |
] | |
) | |
def step(self): | |
self._init_renderer() | |
with wp.ScopedTimer("step"): | |
if self.use_cuda_graph: | |
wp.capture_launch(self.graph) | |
else: | |
self.tape = wp.Tape() | |
with self.tape: | |
self.forward() | |
wp.launch(multiply_kernel, dim=1, inputs=[self.loss, self.scale]) | |
self.tape.backward(self.loss) | |
if self.verbose: | |
print(f"Iteration {self.iter}: Loss: {self.loss}") | |
print(f"Current pos: {self.states[-1].particle_q.numpy()[self.particle_extforce_idx]}") | |
print(f"Before Current E: {self.log_E.numpy()[0]}") | |
if self.train_E and self.train_nu: | |
x_E = self.log_E.grad.flatten() | |
x_nu = self.nu.grad.flatten() | |
self.optimizer.step([x_E, x_nu]) | |
elif self.train_E: | |
# x_E = self.log_E.grad.flatten() | |
# self.optimizer.step([x_E]) | |
self.optimizer.step() | |
elif self.train_nu: | |
x_nu = self.nu.grad.flatten() | |
self.optimizer.step([x_nu]) | |
self.lr_scheduler.step() | |
if self.verbose: | |
print(f"After Current E: {self.log_E.numpy()[0]}") | |
# print(f"After Current nu: {self.nu.numpy()[0]}") | |
print(f"E grad: {self.log_E.grad.numpy()[0]}") | |
# print(f"nu grad: {self.nu.grad.numpy()[0]}") | |
self.sim_time = 0.0 | |
self.states[0] = self.model.state(requires_grad=self.train) | |
self.tape.zero() | |
# Record values for plotting | |
self.loss_history.append(float(self.loss.numpy()[0])) | |
self.E_history.append(float(10**self.log_E.numpy()[0])) | |
self.nu_history.append(float(self.nu.numpy()[0])) | |
self.loss.zero_() | |
self.iter += 1 | |
def render(self): | |
if self.renderer is None: | |
return | |
with wp.ScopedTimer("render"): | |
for frame in range(self.num_frames): | |
self.save_frame(frame) | |
self.render_time += self.frame_dt | |
def save_frame(self, frame_idx): | |
"""Only call this function outside of gradient tape if needed for visualization""" | |
output_path = os.path.join(self.frames_path, f"frame_{frame_idx:04d}.png") | |
image = self.pred_frames[frame_idx].numpy().reshape((self.height, self.width, 3)) | |
img.imsave(output_path, image) | |
def plot_optimization_progress(self, target_E, target_nu): | |
"""Plot the optimization progress.""" | |
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(10, 12)) | |
ax1.semilogy(self.loss_history) | |
ax1.set_xlabel('Iteration') | |
ax1.set_ylabel('Loss') | |
ax1.set_title('Loss vs Iteration') | |
ax1.grid(True) | |
ax2.plot(self.E_history, label='Current E') | |
ax2.axhline(y=target_E, color='r', linestyle='--', label='Target E') | |
ax2.set_yscale('log') | |
ax2.set_xlabel('Iteration') | |
ax2.set_ylabel('Young\'s Modulus (Pa)') | |
ax2.legend() | |
ax3.plot(self.nu_history, 'g', label='Current ν') | |
ax3.axhline(y=target_nu, color='r', linestyle='--', label='Target ν') | |
ax3.set_xlabel('Iteration') | |
ax3.set_ylabel('Poisson\'s Ratio') | |
ax3.legend() | |
plt.tight_layout() | |
output_file = os.path.join(self.output_path, "optimization_progress.png") | |
plt.savefig(output_file) | |
plt.close() | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument("--device", type=str, default=None, help="Override the default Warp device.") | |
parser.add_argument("--num_frames", type=int, default=20, help="Total number of frames.") | |
parser.add_argument("--train_iters", type=int, default=1, help="Total number of training iterations.") | |
parser.add_argument("--verbose", action="store_true", help="Print out additional status messages during execution.") | |
parser.add_argument("--train_E", action="store_true", default=True, help="Train Young's Modulus.") | |
parser.add_argument("--train_nu", action="store_true", default=False, help="Train Poisson's Ratio.") | |
parser.add_argument("--E", type=float, default=1e5, help="Initial Young's Modulus.") | |
parser.add_argument("--nu", type=float, default=0.1, help="Initial Poisson's Ratio.") | |
parser.add_argument("--target_E", type=float, default=1e3, help="Target Young's Modulus.") | |
parser.add_argument("--target_nu", type=float, default=0.1, help="Target Poisson's Ratio.") | |
parser.add_argument("--isDepth", action="store_true", default=False, help="Whether to render color or depth.") | |
parser.add_argument( | |
"--base_path", | |
type=lambda x: None if x == "None" else str(x), | |
default="output", | |
help="Name of the output USD file.", | |
) | |
args = parser.parse_known_args()[0] | |
with wp.ScopedDevice(args.device): | |
target_base_path = "target_output" | |
target_sim = Simulation(base_path=target_base_path, num_frames=args.num_frames, | |
isDepth=args.isDepth, train_E=False, | |
train_nu=False, E=args.target_E, | |
nu=args.target_nu, verbose=args.verbose) | |
target_sim.forward() | |
target_sim.render() | |
target_output_path = "target_output/E1000.0" # target_base_path" | |
sim = Simulation(base_path=args.base_path, # target_states=target_sim.states | |
target_output_path=target_output_path, num_frames=args.num_frames, | |
isDepth=args.isDepth, train_E=args.train_E, | |
train_nu=args.train_nu, E=args.E, nu=args.nu, | |
verbose=args.verbose, train_iters=args.train_iters) | |
for i in range(args.train_iters): | |
sim.step() | |
sim.render() | |
sim.plot_optimization_progress(args.target_E, args.target_nu) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment