Skip to content

Instantly share code, notes, and snippets.

@rrzhang139
Last active January 6, 2025 02:33
import os
import glob
import numpy as np
import math
import torch
import warp as wp
import warp.sim
import warp.sim.render
import warp.optim
import matplotlib.pyplot as plt
import matplotlib.image as img
from PIL import Image
from diffray import Diffray
os.environ['PYGLET_HEADLESS'] = 'true'
@wp.kernel
def calculate_and_assign_material_params(
log_E: wp.array(dtype=wp.float32),
nu: wp.array(dtype=wp.float32),
tet_materials: wp.array2d(dtype=wp.float32),
param_min_E: float,
param_max_E: float,
param_min_nu: float,
param_max_nu: float,
):
tid = wp.tid()
safe_log_E = wp.clamp(log_E[0], wp.log10(param_min_E), wp.log10(param_max_E))
safe_nu = wp.clamp(nu[0], param_min_nu, param_max_nu) # Avoid numerical instability
k_mu = wp.pow(10.0, safe_log_E) / (2.0 * (1.0 + safe_nu)) # ⚠️ Can be unstable near nu = 0.5
k_lambda = wp.pow(10.0, safe_log_E) * safe_nu / ((1.0 + safe_nu) * (1.0 - 2.0 * safe_nu))
tet_materials[tid, 0] = k_mu
tet_materials[tid, 1] = k_lambda
@wp.kernel
def loss_kernel(
particle_q: wp.array(dtype=wp.vec3),
target_q: wp.array(dtype=wp.vec3),
loss: wp.array(dtype=float)
):
tid = wp.tid()
wp.atomic_add(loss, 0, wp.length_sq(particle_q[tid] - target_q[tid]))
@wp.kernel
def image_loss_kernel(
predicted_frame: wp.array(dtype=wp.vec3), #wp.array3d(dtype=wp.float32),
target_frame: wp.array(dtype=wp.vec3),
loss: wp.array(dtype=float),
num_elements: float
):
tid = wp.tid()
pred_val = predicted_frame[tid]
target_val = target_frame[tid]
diff = pred_val - target_val
wp.atomic_add(loss, 0, (diff[0]*diff[0] + diff[1]*diff[1] + diff[2]*diff[2]) / num_elements)
@wp.kernel
def multiply_kernel(
loss: wp.array(dtype=float),
scale: float,
):
loss[0] = loss[0] * scale
@wp.kernel
def step_kernel(
x: wp.array(dtype=wp.float32),
grad: wp.array(dtype=wp.float32),
learning_rate: float
):
new_x = x[0] - learning_rate * grad[0]
x[0] = new_x
@wp.kernel
def constraint(
x: wp.array(dtype=wp.float32),
bound_min: float,
bound_max: float
):
tid = wp.tid()
x[tid] = wp.clamp(x[tid], bound_min, bound_max)
# r = bound_max - bound_min
# y_scale = r / 2.0
# x_scale = 2.0 / r
# res = y_scale * wp.tanh(x_scale * x[tid]) + (bound_min + y_scale)
# x[tid] = res
@wp.kernel
def eval_external_particle_forces(
input_forces: wp.array(dtype=wp.vec3),
particle_f: wp.array(dtype=wp.vec3)):
tid = wp.tid()
wp.atomic_add(particle_f, tid, input_forces[tid])
def load_frames_to_warp_array(target_output_path, channels=3):
frame_files = sorted(glob.glob(f"{target_output_path}/frames/*.png"))
if not frame_files:
raise ValueError(f"No frames found in {target_output_path}/frames/")
sample_img = Image.open(frame_files[0])
h, w = np.array(sample_img).shape[:2]
frames = []
for i, frame_file in enumerate(frame_files):
if channels == 3:
img = Image.open(frame_file).convert('RGB')
frames.append(wp.array(np.array(img).reshape(-1, 3) / 255.0, dtype=wp.vec3, requires_grad=False))
else:
img = Image.open(frame_file).convert('L')
frames.append(wp.array(np.array(img).reshape(-1, 1) / 255.0, dtype=wp.float32, requires_grad=False))
return frames
class Simulation:
def __init__(self,
base_path="output",
target_output_path=None,
target_states=None,
isDepth=False,
num_frames=300,
train_E=True,
train_nu=False,
E=1e5,
nu=0.1,
verbose=False,
train_iters=1
):
self.train = train_E
self.base_path = base_path
self.device = wp.get_device()
self.verbose = verbose
self.sim_substeps = 128
self.num_frames = num_frames
fps = 60
sim_duration = self.num_frames / fps
self.frame_dt = 1.0 / fps
self.sim_dt = self.frame_dt / self.sim_substeps
self.sim_time = 0.0
self.render_time = 0.0
self.scale = 100.0
self.force_magnitude = 100.0
self.iter = 0
self.train_rate = 1e-1
self.initial_E = E
self.initial_nu = nu
k_mu = E / (2 * (1 + nu))
k_lambda = (E * nu) / ((1 + nu) * (1 - 2 * nu))
# Differentiable parameters
self.log_E = wp.array([wp.log10(self.initial_E)], dtype=wp.float32, requires_grad=self.train)
self.nu = wp.array([self.initial_nu], dtype=wp.float32, requires_grad=self.train)
self.param_bounds = [
(1e2, 1e7), # E bounds (Pa)
(0.01, 0.49), # nu bounds (Poisson ratio)
(1e-6, 10.0) # k_damp bounds
]
builder = wp.sim.ModelBuilder()
builder.default_particle_radius = 0.01
cell_dim = 4
cell_size = (1.0 / cell_dim)
center = cell_size * cell_dim * 0.5
density = 1.0
builder.add_soft_grid(
pos=wp.vec3(-center, 0.0, -center),
rot=wp.quat_identity(),
vel=wp.vec3(0.0, 0.0, 0.0),
dim_x=cell_dim,
dim_y=cell_dim,
dim_z=cell_dim,
cell_x=cell_size,
cell_y=cell_size,
cell_z=cell_size,
density=density,
fix_left=True,
# doesnt matter what these are because we will assign them later
k_mu=k_mu,
k_lambda=k_lambda,
k_damp=0.0,
)
self.model = builder.finalize(requires_grad=self.train)
self.model.tet_materials.requires_grad = self.train
self.model.gravity[1] = 0.0
self.model.ground = False
self.control = self.model.control()
self.integrator = wp.sim.SemiImplicitIntegrator()
self.states = [self.model.state(requires_grad=self.train) for _ in range(self.num_frames * self.sim_substeps + 1)]
self.loss = wp.zeros(1, dtype=float, requires_grad=self.train)
self.train_E = train_E
self.train_nu = train_nu
if self.train_E:
# self.optimizer = wp.optim.Adam([self.log_E.flatten()], lr=self.train_rate)
self.optimizer = torch.optim.Adam([{"params": wp.to_torch(self.log_E.flatten()), "lr": self.train_rate}])
self.lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer=self.optimizer, T_max=train_iters
)
elif self.train_nu:
self.optimizer = wp.optim.Adam([self.nu.flatten()], lr=self.train_rate)
else:
self.optimizer = wp.optim.Adam([self.log_E.flatten(), self.nu.flatten()], lr=self.train_rate)
# self.optimizer = torch.optim.Adam([{"params": wp.to_torch(self.log_E), "lr": 1e-1}, {"params": wp.to_torch(self.nu), "lr": 1e-2}])
self.output_path, self.frames_path = self._init_output_folders()
self.channels = 1 if isDepth else 3
self._init_renderer()
self.target_frames = None
self.target_states = [self.model.state(requires_grad=self.train) for _ in range(self.num_frames * self.sim_substeps + 1)]
if target_output_path is not None:
self.target_frames = load_frames_to_warp_array(target_output_path, channels=self.channels)
if target_states is not None:
self.target_states = target_states
self.particle_extforce_idx = self.find_top_surface_point()
self.particle_activations = np.zeros((self.model.particle_count, 3))
self.particle_activations[self.particle_extforce_idx] = np.array([0, -self.force_magnitude, 0])
self.particle_activations = wp.from_numpy(self.particle_activations, dtype=wp.vec3)
# TODO: fix error in graph capture. CUDA 700 illegal memory
self.use_cuda_graph = False # self.train and wp.get_device().is_cuda
if self.use_cuda_graph:
with wp.ScopedCapture() as capture:
self.tape = wp.Tape()
with self.tape:
self.forward()
self.tape.backward(self.loss)
self.graph = capture.graph
self.loss_history = []
self.E_history = []
self.nu_history = []
def _init_renderer(self):
self.width = 1820
self.height = 1024
self.renderer = Diffray(
model=self.model,
scaling=1.0,
camera_pos=(2.0, 2.0, 2.0),
camera_front=(-1.0, -1.0, -1.0),
camera_up=(0.0, 1.0, 0.0),
camera_fov=60.0,
width=self.width,
height=self.height,
num_frames=self.num_frames,
train=self.train
)
self.num_elements = float(self.width * self.height * self.channels * self.num_frames)
self.pred_frames = [wp.zeros(self.height * self.width, dtype=wp.vec3, requires_grad=self.train) for _ in range(self.num_frames)]
def _init_output_folders(self):
"""Create organized output directory structure."""
experiment_dir = f"E{self.initial_E}"
output_path = os.path.join(self.base_path, experiment_dir)
os.makedirs(output_path, exist_ok=True)
frames_path = os.path.join(output_path, "frames")
os.makedirs(frames_path, exist_ok=True)
return output_path, frames_path
def find_top_surface_point(self):
particle_q = self.model.particle_q.numpy()
tri_indices = self.model.tri_indices.numpy()
surface_vertex_indices = np.unique(tri_indices.flatten())
surface_points = particle_q[surface_vertex_indices]
max_y = np.max(surface_points[:, 1]) # Assuming y-axis is the height
top_surface_mask = surface_points[:, 1] == max_y
top_surface_points = surface_points[top_surface_mask]
top_surface_center = np.mean(top_surface_points, axis=0)
distances = np.linalg.norm(top_surface_points - top_surface_center, axis=1)
closest_point_idx = np.argmin(distances)
closest_particle_idx = surface_vertex_indices[top_surface_mask][closest_point_idx]
max_x = np.max(surface_points[:, 0])
right_surface_mask = surface_points[:, 0] == max_x
right_surface_points = surface_points[right_surface_mask]
top_center_distance = np.linalg.norm(right_surface_points - top_surface_center, axis=1)
closest_point_idx = np.argmin(top_center_distance)
closest_particle_idx = surface_vertex_indices[right_surface_mask][closest_point_idx]
return closest_particle_idx
def forward(self):
"""Forward pass of the simulation"""
wp.launch(
calculate_and_assign_material_params,
dim=self.model.tet_materials.shape[0],
inputs=[self.log_E, self.nu, self.model.tet_materials, self.param_bounds[0][0], self.param_bounds[0][1], self.param_bounds[1][0], self.param_bounds[1][1]]
)
self.render_time = 0.0
for frame in range(self.num_frames):
with wp.ScopedTimer("simulate"):
for i in range(self.sim_substeps):
self.states[frame * self.sim_substeps + i].clear_forces()
wp.launch(
kernel=eval_external_particle_forces,
dim=self.model.particle_count,
inputs=[self.particle_activations],
outputs=[self.states[frame * self.sim_substeps + i].particle_f]
)
self.integrator.simulate(
self.model,
self.states[frame * self.sim_substeps + i],
self.states[frame * self.sim_substeps + i + 1],
self.sim_dt
)
self.sim_time += self.sim_dt
with wp.ScopedTimer("render"):
self.renderer.ray_cast(self.states[(frame + 1) * self.sim_substeps].particle_q, self.pred_frames[frame], frame)
if self.train:
with wp.ScopedTimer("loss", active=self.train):
for frame in range(self.num_frames):
wp.launch(
image_loss_kernel,
dim=(self.width * self.height),
inputs=[
self.pred_frames[frame],
self.target_frames[frame],
self.loss,
self.num_elements
]
)
def step(self):
self._init_renderer()
with wp.ScopedTimer("step"):
if self.use_cuda_graph:
wp.capture_launch(self.graph)
else:
self.tape = wp.Tape()
with self.tape:
self.forward()
wp.launch(multiply_kernel, dim=1, inputs=[self.loss, self.scale])
self.tape.backward(self.loss)
if self.verbose:
print(f"Iteration {self.iter}: Loss: {self.loss}")
print(f"Current pos: {self.states[-1].particle_q.numpy()[self.particle_extforce_idx]}")
print(f"Before Current E: {self.log_E.numpy()[0]}")
if self.train_E and self.train_nu:
x_E = self.log_E.grad.flatten()
x_nu = self.nu.grad.flatten()
self.optimizer.step([x_E, x_nu])
elif self.train_E:
# x_E = self.log_E.grad.flatten()
# self.optimizer.step([x_E])
self.optimizer.step()
elif self.train_nu:
x_nu = self.nu.grad.flatten()
self.optimizer.step([x_nu])
self.lr_scheduler.step()
if self.verbose:
print(f"After Current E: {self.log_E.numpy()[0]}")
# print(f"After Current nu: {self.nu.numpy()[0]}")
print(f"E grad: {self.log_E.grad.numpy()[0]}")
# print(f"nu grad: {self.nu.grad.numpy()[0]}")
self.sim_time = 0.0
self.states[0] = self.model.state(requires_grad=self.train)
self.tape.zero()
# Record values for plotting
self.loss_history.append(float(self.loss.numpy()[0]))
self.E_history.append(float(10**self.log_E.numpy()[0]))
self.nu_history.append(float(self.nu.numpy()[0]))
self.loss.zero_()
self.iter += 1
def render(self):
if self.renderer is None:
return
with wp.ScopedTimer("render"):
for frame in range(self.num_frames):
self.save_frame(frame)
self.render_time += self.frame_dt
def save_frame(self, frame_idx):
"""Only call this function outside of gradient tape if needed for visualization"""
output_path = os.path.join(self.frames_path, f"frame_{frame_idx:04d}.png")
image = self.pred_frames[frame_idx].numpy().reshape((self.height, self.width, 3))
img.imsave(output_path, image)
def plot_optimization_progress(self, target_E, target_nu):
"""Plot the optimization progress."""
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(10, 12))
ax1.semilogy(self.loss_history)
ax1.set_xlabel('Iteration')
ax1.set_ylabel('Loss')
ax1.set_title('Loss vs Iteration')
ax1.grid(True)
ax2.plot(self.E_history, label='Current E')
ax2.axhline(y=target_E, color='r', linestyle='--', label='Target E')
ax2.set_yscale('log')
ax2.set_xlabel('Iteration')
ax2.set_ylabel('Young\'s Modulus (Pa)')
ax2.legend()
ax3.plot(self.nu_history, 'g', label='Current ν')
ax3.axhline(y=target_nu, color='r', linestyle='--', label='Target ν')
ax3.set_xlabel('Iteration')
ax3.set_ylabel('Poisson\'s Ratio')
ax3.legend()
plt.tight_layout()
output_file = os.path.join(self.output_path, "optimization_progress.png")
plt.savefig(output_file)
plt.close()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--device", type=str, default=None, help="Override the default Warp device.")
parser.add_argument("--num_frames", type=int, default=20, help="Total number of frames.")
parser.add_argument("--train_iters", type=int, default=1, help="Total number of training iterations.")
parser.add_argument("--verbose", action="store_true", help="Print out additional status messages during execution.")
parser.add_argument("--train_E", action="store_true", default=True, help="Train Young's Modulus.")
parser.add_argument("--train_nu", action="store_true", default=False, help="Train Poisson's Ratio.")
parser.add_argument("--E", type=float, default=1e5, help="Initial Young's Modulus.")
parser.add_argument("--nu", type=float, default=0.1, help="Initial Poisson's Ratio.")
parser.add_argument("--target_E", type=float, default=1e3, help="Target Young's Modulus.")
parser.add_argument("--target_nu", type=float, default=0.1, help="Target Poisson's Ratio.")
parser.add_argument("--isDepth", action="store_true", default=False, help="Whether to render color or depth.")
parser.add_argument(
"--base_path",
type=lambda x: None if x == "None" else str(x),
default="output",
help="Name of the output USD file.",
)
args = parser.parse_known_args()[0]
with wp.ScopedDevice(args.device):
target_base_path = "target_output"
target_sim = Simulation(base_path=target_base_path, num_frames=args.num_frames,
isDepth=args.isDepth, train_E=False,
train_nu=False, E=args.target_E,
nu=args.target_nu, verbose=args.verbose)
target_sim.forward()
target_sim.render()
target_output_path = "target_output/E1000.0" # target_base_path"
sim = Simulation(base_path=args.base_path, # target_states=target_sim.states
target_output_path=target_output_path, num_frames=args.num_frames,
isDepth=args.isDepth, train_E=args.train_E,
train_nu=args.train_nu, E=args.E, nu=args.nu,
verbose=args.verbose, train_iters=args.train_iters)
for i in range(args.train_iters):
sim.step()
sim.render()
sim.plot_optimization_progress(args.target_E, args.target_nu)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment