-
-
Save stuartcrobinson/7999cef92f5415ab3b050a274d9e86db to your computer and use it in GitHub Desktop.
all the observations and helper code gutted from isaacgymenvs/tasks/ant.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import os | |
import torch | |
from isaacgym import gymtorch | |
from isaacgym import gymapi | |
from isaacgym.gymtorch import * | |
from isaacgymenvs.utils.torch_jit_utils import * | |
from .base.vec_task import VecTask | |
class Ant(VecTask): | |
def __init__(self, cfg, sim_device, graphics_device_id, headless): | |
self.cfg = cfg | |
self.max_episode_length = 100 #required (in vec_task.py) | |
self.power_scale = 1.0 | |
self.cfg["env"]["numObservations"] = 1 | |
self.cfg["env"]["numActions"] = 8 | |
super().__init__(config=self.cfg, sim_device=sim_device, graphics_device_id=graphics_device_id, headless=headless) | |
if self.viewer != None: | |
cam_pos = gymapi.Vec3(50.0, 25.0, 2.4) | |
cam_target = gymapi.Vec3(45.0, 25.0, 0.0) | |
self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target) | |
# get gym GPU state tensors | |
actor_root_state = self.gym.acquire_actor_root_state_tensor(self.sim) | |
dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim) | |
sensor_tensor = self.gym.acquire_force_sensor_tensor(self.sim) | |
sensors_per_env = 4 | |
self.vec_sensor_tensor = gymtorch.wrap_tensor(sensor_tensor).view(self.num_envs, sensors_per_env * 6) | |
self.gym.refresh_dof_state_tensor(self.sim) | |
self.gym.refresh_actor_root_state_tensor(self.sim) | |
self.root_states = gymtorch.wrap_tensor(actor_root_state) | |
self.initial_root_states = self.root_states.clone() | |
self.initial_root_states[:, 7:13] = 0 # set lin_vel and ang_vel to 0 | |
# create some wrapper tensors for different slices | |
self.dof_state = gymtorch.wrap_tensor(dof_state_tensor) | |
self.dof_pos = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 0] | |
self.dof_vel = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 1] | |
self.initial_dof_pos = torch.zeros_like(self.dof_pos, device=self.device, dtype=torch.float) | |
zero_tensor = torch.tensor([0.0], device=self.device) | |
self.initial_dof_pos = torch.where(self.dof_limits_lower > zero_tensor, self.dof_limits_lower, | |
torch.where( self.dof_limits_upper < zero_tensor, | |
self.dof_limits_upper, | |
self.initial_dof_pos)) | |
def create_sim(self): | |
self.up_axis_idx = 2 # index of up axis: Y=1, Z=2 | |
self.sim = super().create_sim(self.device_id, self.graphics_device_id, self.physics_engine, self.sim_params) | |
self._create_ground_plane() | |
print(f'num envs {self.num_envs} env spacing {self.cfg["env"]["envSpacing"]}') | |
self._create_envs(self.num_envs, self.cfg["env"]['envSpacing'], int(np.sqrt(self.num_envs))) | |
def _create_ground_plane(self): | |
plane_params = gymapi.PlaneParams() | |
plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0) | |
plane_params.static_friction = 1.0 | |
plane_params.dynamic_friction = 1.0 | |
self.gym.add_ground(self.sim, plane_params) | |
def _create_envs(self, num_envs, spacing, num_per_row): | |
lower = gymapi.Vec3(-spacing, -spacing, 0.0) | |
upper = gymapi.Vec3(spacing, spacing, spacing) | |
asset_root = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../../assets') | |
asset_file = "mjcf/nv_ant.xml" | |
if "asset" in self.cfg["env"]: | |
asset_file = self.cfg["env"]["asset"].get("assetFileName", asset_file) | |
asset_path = os.path.join(asset_root, asset_file) | |
asset_root = os.path.dirname(asset_path) | |
asset_file = os.path.basename(asset_path) | |
asset_options = gymapi.AssetOptions() | |
# Note - DOF mode is set in the MJCF file and loaded by Isaac Gym | |
asset_options.default_dof_drive_mode = gymapi.DOF_MODE_NONE | |
asset_options.angular_damping = 0.0 | |
ant_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options) | |
self.num_dof = self.gym.get_asset_dof_count(ant_asset) | |
self.num_bodies = self.gym.get_asset_rigid_body_count(ant_asset) | |
# Note - for this asset we are loading the actuator info from the MJCF | |
actuator_props = self.gym.get_asset_actuator_properties(ant_asset) | |
motor_efforts = [prop.motor_effort for prop in actuator_props] | |
self.joint_gears = to_torch(motor_efforts, device=self.device) | |
start_pose = gymapi.Transform() | |
start_pose.p = gymapi.Vec3(*get_axis_params(0.44, self.up_axis_idx)) | |
self.start_rotation = torch.tensor([start_pose.r.x, start_pose.r.y, start_pose.r.z, start_pose.r.w], device=self.device) | |
self.torso_index = 0 | |
self.num_bodies = self.gym.get_asset_rigid_body_count(ant_asset) | |
body_names = [self.gym.get_asset_rigid_body_name(ant_asset, i) for i in range(self.num_bodies)] | |
extremity_names = [s for s in body_names if "foot" in s] | |
self.extremities_index = torch.zeros(len(extremity_names), dtype=torch.long, device=self.device) | |
# create force sensors attached to the "feet" | |
extremity_indices = [self.gym.find_asset_rigid_body_index(ant_asset, name) for name in extremity_names] | |
sensor_pose = gymapi.Transform() | |
for body_idx in extremity_indices: | |
self.gym.create_asset_force_sensor(ant_asset, body_idx, sensor_pose) | |
self.ant_handles = [] | |
self.envs = [] | |
self.dof_limits_lower = [] | |
self.dof_limits_upper = [] | |
for i in range(self.num_envs): | |
# create env instance | |
env_ptr = self.gym.create_env(self.sim, lower, upper, num_per_row) | |
ant_handle = self.gym.create_actor(env_ptr, ant_asset, start_pose, "ant", i, 1, 0) | |
for j in range(self.num_bodies): | |
self.gym.set_rigid_body_color(env_ptr, ant_handle, j, gymapi.MESH_VISUAL, gymapi.Vec3(0.97, 0.38, 0.06)) | |
self.envs.append(env_ptr) | |
self.ant_handles.append(ant_handle) | |
dof_prop = self.gym.get_actor_dof_properties(env_ptr, ant_handle) | |
for j in range(self.num_dof): | |
if dof_prop['lower'][j] > dof_prop['upper'][j]: | |
self.dof_limits_lower.append(dof_prop['upper'][j]) | |
self.dof_limits_upper.append(dof_prop['lower'][j]) | |
else: | |
self.dof_limits_lower.append(dof_prop['lower'][j]) | |
self.dof_limits_upper.append(dof_prop['upper'][j]) | |
self.dof_limits_lower = to_torch(self.dof_limits_lower, device=self.device) | |
self.dof_limits_upper = to_torch(self.dof_limits_upper, device=self.device) | |
for i in range(len(extremity_names)): | |
self.extremities_index[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.ant_handles[0], extremity_names[i]) | |
def compute_reward(self): | |
self.rew_buf[:], self.reset_buf[:] = compute_ant_reward( | |
self.root_states, | |
self.dof_pos, | |
self.dof_vel, | |
self.vec_sensor_tensor, | |
self.reset_buf, | |
self.progress_buf, | |
self.max_episode_length | |
) | |
def compute_observations(self): | |
self.gym.refresh_dof_state_tensor(self.sim) | |
self.gym.refresh_actor_root_state_tensor(self.sim) | |
self.gym.refresh_force_sensor_tensor(self.sim) | |
self.obs_buf = torch.ones((self.num_envs, self.cfg["env"]["numObservations"]), device=self.device, dtype=torch.float) | |
def reset_idx(self, env_ids): | |
positions = torch_rand_float(-0.2, 0.2, (len(env_ids), self.num_dof), device=self.device) | |
velocities = torch_rand_float(-0.1, 0.1, (len(env_ids), self.num_dof), device=self.device) | |
self.dof_pos[env_ids] = tensor_clamp(self.initial_dof_pos[env_ids] + positions, self.dof_limits_lower, self.dof_limits_upper) | |
self.dof_vel[env_ids] = velocities | |
env_ids_int32 = env_ids.to(dtype=torch.int32) | |
self.gym.set_actor_root_state_tensor_indexed(self.sim, | |
gymtorch.unwrap_tensor(self.initial_root_states), | |
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32)) | |
self.gym.set_dof_state_tensor_indexed(self.sim, | |
gymtorch.unwrap_tensor(self.dof_state), | |
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32)) | |
self.progress_buf[env_ids] = 0 | |
self.reset_buf[env_ids] = 0 | |
def pre_physics_step(self, actions): | |
self.actions = actions.clone().to(self.device) | |
forces = self.actions * self.joint_gears * self.power_scale | |
force_tensor = gymtorch.unwrap_tensor(forces) | |
self.gym.set_dof_actuation_force_tensor(self.sim, force_tensor) | |
def post_physics_step(self): | |
self.progress_buf += 1 | |
env_ids = self.reset_buf.nonzero(as_tuple=False).flatten() | |
if len(env_ids) > 0: | |
self.reset_idx(env_ids) | |
self.compute_observations() | |
self.compute_reward() | |
self.adjust_camera() | |
def adjust_camera(self): | |
if self.viewer != None and self.num_envs < 50: | |
i = 0 | |
x = self.root_states[:, 0][i].item() | |
y = self.root_states[:, 1][i].item() | |
z = self.root_states[:, 2][i].item() | |
cam_pos = gymapi.Vec3(x - 6, y - 6, 3) | |
cam_target = gymapi.Vec3(x, y, z) | |
self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target) | |
##################################################################### | |
###=========================jit functions=========================### | |
##################################################################### | |
# https://www.arxiv-vanity.com/papers/2108.10470/ | |
@torch.jit.script | |
def compute_ant_reward( | |
root_states, | |
dof_pos, | |
dof_vel, | |
force_sensors, | |
reset_buf, | |
progress_buf, | |
max_episode_length | |
): | |
# type: (Tensor, Tensor, Tensor, Tensor, Tensor, Tensor, int) -> Tuple[Tensor, Tensor] | |
ones = torch.ones_like(reset_buf) | |
abs_dof_vel = torch.abs(dof_vel[:,]) | |
sum_abs_vel = torch.sum(abs_dof_vel) | |
# # moves slowly for a while | |
# reward = torch.where(sum_abs_vel > 0.5, ones, ones*0) | |
# # stops moving ASAP | |
# reward = torch.where(sum_abs_vel < 0.5, ones, ones*0) | |
# sits up tall | |
reward = root_states[:,2] | |
# squats down low | |
# reward = -1*root_states[:,2] | |
reset = torch.where(progress_buf >= max_episode_length, ones, reset_buf) | |
return reward, reset | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment