-
-
Save Tadinu/d9469e20df0040c97adca8aca1b766af to your computer and use it in GitHub Desktop.
OmniKit Test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Demonstration of using OmniKit to generate a scene, collect groundtruth and visualize | |
the results. This advanced sample also simulates physics and uses a custom glass material | |
""" | |
import os | |
import random | |
import numpy as np | |
from pxr import UsdGeom, Semantics | |
import omni.usd | |
from omni.isaac.synthetic_utils import visualization as vis | |
from omni.isaac.synthetic_utils import camera | |
from omni.isaac.synthetic_utils import OmniKitHelper | |
from omni.isaac.synthetic_utils import SyntheticDataHelper | |
from omni.isaac.synthetic_utils import utils as ut | |
from omni.physx.scripts import utils | |
from pxr import Usd, Gf, Sdf, UsdShade, PhysicsSchema, PhysxSchema, PhysicsSchemaTools | |
from PIL import Image, ImageDraw | |
TRANSLATION_RANGE = 300.0 | |
SCALE = 50.0 | |
# specify a custom config | |
CUSTOM_CONFIG = { | |
"width": 1024, | |
"height": 1024, | |
"renderer": "PathTracing", | |
"samples_per_pixel_per_frame": 128, | |
"max_bounces": 10, | |
"max_specular_transmission_bounces": 6, | |
"max_volume_bounces": 4, | |
"subdiv_refinement_level": 2, | |
"headless": True, | |
"experience": f'{os.environ["EXP_PATH"]}/isaac-sim-python.json', | |
} | |
class CustomKitHelper: | |
stage_event = None | |
sd_helper = None | |
kit = None | |
@classmethod | |
def setup_stage(cls): | |
# Get the current stage | |
# (SNOTE) Stage would be updated AFTER Play | |
current_context = omni.usd.get_context() | |
cls.stage_event = current_context.get_stage_event_stream().create_subscription_to_pop(cls.on_stage_event) | |
stage = current_context.get_stage() | |
assert(stage) | |
print('STAGE', stage) | |
print('- Frames per second:', stage.GetFramesPerSecond()) | |
print('- Interpolation type:', stage.GetInterpolationType()) | |
@classmethod | |
def fetch_physics_scene(cls): | |
stage = cls.kit.get_stage() | |
for prim in stage.Traverse(): | |
if prim.IsA(PhysicsSchema.PhysicsScene): | |
return True | |
print('There is no physics scene in the stage. Please consider adding one if required!') | |
return False | |
@classmethod | |
def create_physics_scene(cls): | |
stage = cls.kit.get_stage() | |
# Add physics scene | |
scene = PhysicsSchema.PhysicsScene.Define(stage, Sdf.Path("/World/physicsScene")) | |
# Set gravity vector | |
scene.CreateGravityAttr().Set(Gf.Vec3f(0, -981.0, 0)) | |
# Set physics scene to use cpu physics | |
PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/World/physicsScene")) | |
physxSceneAPI = PhysxSchema.PhysxSceneAPI.Get(stage, "/World/physicsScene") | |
physxSceneAPI.CreatePhysxSceneEnableCCDAttr(True) | |
physxSceneAPI.CreatePhysxSceneEnableStabilizationAttr(True) | |
physxSceneAPI.CreatePhysxSceneEnableGPUDynamicsAttr(False) | |
physxSceneAPI.CreatePhysxSceneBroadphaseTypeAttr("MBP") | |
physxSceneAPI.CreatePhysxSceneSolverTypeAttr("TGS") | |
@classmethod | |
def wait_for_sim_loading(cls): | |
# Wait until Omni Kit is finished with the rendering, when all materials are loaded | |
while cls.kit.is_loading(): | |
cls.kit.update(1 / 60.0) | |
@classmethod | |
def load_map(cls, in_map_usd_path): | |
print(f'Loading map from stage path [{in_map_usd_path}]...') | |
if not Usd.Stage.IsSupportedFile(in_map_usd_path): | |
raise ValueError("Only USD files can be loaded with this method") | |
assert(os.path.isabs(in_map_usd_path)) | |
usd_context = omni.usd.get_context() | |
usd_context.disable_save_to_recent_files() | |
usd_context.open_stage(in_map_usd_path, cls.on_map_loaded) | |
usd_context.enable_save_to_recent_files() | |
@classmethod | |
def on_map_loaded(cls, result, error): | |
if not result: | |
print('Map has failed being loaded!') | |
return | |
print('Map has been loaded!') | |
# A new map has been loaded -> Re-setup the stage | |
cls.setup_stage() | |
# Fetch/Create Physics Scene | |
if not cls.fetch_physics_scene(): | |
cls.create_physics_scene() | |
# Wait for all materials to be loaded | |
cls.wait_for_sim_loading() | |
# Fast forward sim | |
cls.fast_forward_sim() | |
@classmethod | |
def on_stage_event(cls, in_event): | |
if int(omni.usd.StageEventType.OPENED) == in_event.type: | |
print('STAGE OPENED') | |
# Then do the task | |
cls.do_task() | |
elif int(omni.usd.StageEventType.CLOSED) == in_event.type: | |
print('STAGE CLOSED') | |
elif int(omni.usd.StageEventType.OPEN_FAILED) == in_event.type: | |
print('Failed opening stage!') | |
elif int(omni.usd.StageEventType.ASSETS_LOADED) == in_event.type: | |
print("Stage's assets have been all loaded!") | |
elif int(omni.usd.StageEventType.ASSETS_LOAD_ABORTED) == in_event.type: | |
print("Stage's assets loading has been aborted!") | |
@classmethod | |
def set_active_camera(cls, in_camera_prim_path): | |
# By Editor | |
cls.kit.editor.set_active_camera(in_camera_prim_path) | |
# By Viewport window | |
vpi = omni.kit.viewport.get_viewport_interface() | |
vpi_window = vpi.get_viewport_window() | |
assert(vpi_window) | |
vpi_window.set_active_camera(in_camera_prim_path) | |
@classmethod | |
def spawn_camera(cls, in_camera_name): | |
stage = cls.kit.get_stage() | |
camera = stage.DefinePrim(f'/{in_camera_name}', 'Camera') | |
camera_path = camera.GetPath().pathString | |
print('Newly spawned camera:', camera_path) | |
cls.set_active_camera(camera_path) | |
import time | |
time.sleep(1.0) | |
print(cls.kit.editor.get_active_camera()) | |
return camera | |
@classmethod | |
def fast_forward_sim(cls, in_frame_num = 120): | |
# force RayTracedLighting mode for better performance while simulating physics | |
cls.kit.set_setting("/rtx/rendermode", "RayTracedLighting") | |
# start simulation | |
cls.kit.play() | |
# Step simulation so that objects fall to rest | |
# wait until all materials are loaded | |
frame = 0 | |
print("simulating physics...") | |
while frame < in_frame_num or cls.kit.is_loading(): | |
cls.kit.update(1 / 60.0) | |
frame = frame + 1 | |
print("done") | |
cls.kit.set_setting("/rtx/rendermode", CUSTOM_CONFIG["renderer"]) | |
@classmethod | |
def capture_scene(cls): | |
print('Scene capturing...') | |
# Get groundtruth using glass material | |
ground_truth = cls.sd_helper.get_groundtruth( | |
[ | |
"rgb", | |
"camera", | |
"depth", | |
"boundingBox2DTight", | |
"boundingBox2DLoose", | |
"instanceSegmentation", | |
"semanticSegmentation", | |
"boundingBox3D", | |
] | |
) | |
print('Scene capturing done!') | |
# everything is captured, stop simulating | |
cls.kit.stop() | |
# Save ground truth data locally as png | |
image_data = ut.to_numpy(ground_truth["rgb"]) | |
rgb_img = Image.fromarray(image_data, "RGBA") | |
rgb_img.save('rgb.png') | |
return ground_truth | |
@classmethod | |
def spawn_blocks(cls): | |
stage = cls.kit.get_stage() | |
# Create 10 randomly positioned and coloured spheres and cube | |
# We will assign each a semantic label based on their shape (sphere/cube/cone) | |
prims = [] | |
for i in range(10): | |
prim_type = random.choice(["Cube", "Sphere", "Cylinder", "Cone"]) | |
prim = stage.DefinePrim(f"/World/{prim_type}{i}", prim_type) | |
translation = np.random.rand(3) * TRANSLATION_RANGE | |
UsdGeom.XformCommonAPI(prim).SetTranslate(translation.tolist()) | |
UsdGeom.XformCommonAPI(prim).SetScale((SCALE, SCALE, SCALE)) | |
# prim.GetAttribute("primvars:displayColor").Set([np.random.rand(3).tolist()]) | |
# Add semantic label based on prim type | |
sem = Semantics.SemanticsAPI.Apply(prim, "Semantics") | |
sem.CreateSemanticTypeAttr() | |
sem.CreateSemanticDataAttr() | |
sem.GetSemanticTypeAttr().Set("class") | |
sem.GetSemanticDataAttr().Set(prim_type) | |
# Add physics to prims | |
utils.setRigidBody(prim, "convexHull", False) | |
# Set Mass to 1 kg | |
mass_api = PhysicsSchema.MassAPI.Apply(prim) | |
mass_api.CreateMassAttr(1) | |
# add prim reference to list | |
prims.append(prim) | |
# Apply glass material | |
for prim, i in zip(prims, range(len(prims))): | |
# Create Glass material | |
mtl_created_list = [] | |
if (i%2 == 0): | |
omni.kit.commands.execute( | |
"CreateAndBindMdlMaterialFromLibrary", | |
mdl_name="OmniGlass.mdl", | |
mtl_name="OmniGlass", | |
mtl_created_list=mtl_created_list, | |
) | |
else: | |
# Create a new material using OmniPBR.mdl | |
omni.kit.commands.execute( | |
"CreateAndBindMdlMaterialFromLibrary", | |
mdl_name="OmniGlass_Opacity.mdl", | |
mtl_name="OmniGlass_Opacity", | |
mtl_created_list=mtl_created_list, | |
) | |
mtl_prim = stage.GetPrimAtPath(mtl_created_list[0]) | |
if (i%2 == 0): | |
# Set material inputs, these can be determined by looking at the .mdl file | |
# or by selecting the Shader attached to the Material in the stage window and looking at the details panel | |
color = Gf.Vec3f(random.random(), random.random(), random.random()) | |
omni.kit.usd.create_material_input(mtl_prim, "glass_color", color, Sdf.ValueTypeNames.Color3f) | |
omni.kit.usd.create_material_input(mtl_prim, "glass_ior", 1.25, Sdf.ValueTypeNames.Float) | |
# This value is the volumetric light absorption scale, reduce to zero to make glass clearer | |
omni.kit.usd.create_material_input(mtl_prim, "depth", 0.0005, Sdf.ValueTypeNames.Float) | |
# Enable for thin glass objects if needed | |
omni.kit.usd.create_material_input(mtl_prim, "thin_walled", False, Sdf.ValueTypeNames.Bool) | |
# Bind the material to the prim | |
prim_mat_shade = UsdShade.Material(mtl_prim) | |
UsdShade.MaterialBindingAPI(prim).Bind(prim_mat_shade, UsdShade.Tokens.strongerThanDescendants) | |
@classmethod | |
def do_task(cls): | |
cls.sd_helper = SyntheticDataHelper() | |
# Spawn a new camera and activate it | |
cls.spawn_camera('0_Camera') | |
# Spawn block objects | |
cls.spawn_blocks() | |
# Fast forward sim so that objects fall to rest | |
cls.fast_forward_sim() | |
# Capture the scene with the newly spawned camera | |
cls.capture_scene() | |
def main(): | |
CustomKitHelper.kit = OmniKitHelper(CUSTOM_CONFIG) | |
CustomKitHelper.load_map('ISAAC_ASSETS/Environments/Simple_Warehouse/warehouse.usd') | |
# Fast forward Sim, which runs Kit update for the map loading call back to be triggered | |
CustomKitHelper.wait_for_sim_loading() | |
CustomKitHelper.fast_forward_sim(200) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment