Skip to content

Instantly share code, notes, and snippets.

@Tadinu
Last active January 6, 2021 05:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Tadinu/d9469e20df0040c97adca8aca1b766af to your computer and use it in GitHub Desktop.
Save Tadinu/d9469e20df0040c97adca8aca1b766af to your computer and use it in GitHub Desktop.
OmniKit Test
"""Demonstration of using OmniKit to generate a scene, collect groundtruth and visualize
the results. This advanced sample also simulates physics and uses a custom glass material
"""
import os
import random
import numpy as np
from pxr import UsdGeom, Semantics
import omni.usd
from omni.isaac.synthetic_utils import visualization as vis
from omni.isaac.synthetic_utils import camera
from omni.isaac.synthetic_utils import OmniKitHelper
from omni.isaac.synthetic_utils import SyntheticDataHelper
from omni.isaac.synthetic_utils import utils as ut
from omni.physx.scripts import utils
from pxr import Usd, Gf, Sdf, UsdShade, PhysicsSchema, PhysxSchema, PhysicsSchemaTools
from PIL import Image, ImageDraw
TRANSLATION_RANGE = 300.0
SCALE = 50.0
# specify a custom config
CUSTOM_CONFIG = {
"width": 1024,
"height": 1024,
"renderer": "PathTracing",
"samples_per_pixel_per_frame": 128,
"max_bounces": 10,
"max_specular_transmission_bounces": 6,
"max_volume_bounces": 4,
"subdiv_refinement_level": 2,
"headless": True,
"experience": f'{os.environ["EXP_PATH"]}/isaac-sim-python.json',
}
class CustomKitHelper:
stage_event = None
sd_helper = None
kit = None
@classmethod
def setup_stage(cls):
# Get the current stage
# (SNOTE) Stage would be updated AFTER Play
current_context = omni.usd.get_context()
cls.stage_event = current_context.get_stage_event_stream().create_subscription_to_pop(cls.on_stage_event)
stage = current_context.get_stage()
assert(stage)
print('STAGE', stage)
print('- Frames per second:', stage.GetFramesPerSecond())
print('- Interpolation type:', stage.GetInterpolationType())
@classmethod
def fetch_physics_scene(cls):
stage = cls.kit.get_stage()
for prim in stage.Traverse():
if prim.IsA(PhysicsSchema.PhysicsScene):
return True
print('There is no physics scene in the stage. Please consider adding one if required!')
return False
@classmethod
def create_physics_scene(cls):
stage = cls.kit.get_stage()
# Add physics scene
scene = PhysicsSchema.PhysicsScene.Define(stage, Sdf.Path("/World/physicsScene"))
# Set gravity vector
scene.CreateGravityAttr().Set(Gf.Vec3f(0, -981.0, 0))
# Set physics scene to use cpu physics
PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/World/physicsScene"))
physxSceneAPI = PhysxSchema.PhysxSceneAPI.Get(stage, "/World/physicsScene")
physxSceneAPI.CreatePhysxSceneEnableCCDAttr(True)
physxSceneAPI.CreatePhysxSceneEnableStabilizationAttr(True)
physxSceneAPI.CreatePhysxSceneEnableGPUDynamicsAttr(False)
physxSceneAPI.CreatePhysxSceneBroadphaseTypeAttr("MBP")
physxSceneAPI.CreatePhysxSceneSolverTypeAttr("TGS")
@classmethod
def wait_for_sim_loading(cls):
# Wait until Omni Kit is finished with the rendering, when all materials are loaded
while cls.kit.is_loading():
cls.kit.update(1 / 60.0)
@classmethod
def load_map(cls, in_map_usd_path):
print(f'Loading map from stage path [{in_map_usd_path}]...')
if not Usd.Stage.IsSupportedFile(in_map_usd_path):
raise ValueError("Only USD files can be loaded with this method")
assert(os.path.isabs(in_map_usd_path))
usd_context = omni.usd.get_context()
usd_context.disable_save_to_recent_files()
usd_context.open_stage(in_map_usd_path, cls.on_map_loaded)
usd_context.enable_save_to_recent_files()
@classmethod
def on_map_loaded(cls, result, error):
if not result:
print('Map has failed being loaded!')
return
print('Map has been loaded!')
# A new map has been loaded -> Re-setup the stage
cls.setup_stage()
# Fetch/Create Physics Scene
if not cls.fetch_physics_scene():
cls.create_physics_scene()
# Wait for all materials to be loaded
cls.wait_for_sim_loading()
# Fast forward sim
cls.fast_forward_sim()
@classmethod
def on_stage_event(cls, in_event):
if int(omni.usd.StageEventType.OPENED) == in_event.type:
print('STAGE OPENED')
# Then do the task
cls.do_task()
elif int(omni.usd.StageEventType.CLOSED) == in_event.type:
print('STAGE CLOSED')
elif int(omni.usd.StageEventType.OPEN_FAILED) == in_event.type:
print('Failed opening stage!')
elif int(omni.usd.StageEventType.ASSETS_LOADED) == in_event.type:
print("Stage's assets have been all loaded!")
elif int(omni.usd.StageEventType.ASSETS_LOAD_ABORTED) == in_event.type:
print("Stage's assets loading has been aborted!")
@classmethod
def set_active_camera(cls, in_camera_prim_path):
# By Editor
cls.kit.editor.set_active_camera(in_camera_prim_path)
# By Viewport window
vpi = omni.kit.viewport.get_viewport_interface()
vpi_window = vpi.get_viewport_window()
assert(vpi_window)
vpi_window.set_active_camera(in_camera_prim_path)
@classmethod
def spawn_camera(cls, in_camera_name):
stage = cls.kit.get_stage()
camera = stage.DefinePrim(f'/{in_camera_name}', 'Camera')
camera_path = camera.GetPath().pathString
print('Newly spawned camera:', camera_path)
cls.set_active_camera(camera_path)
import time
time.sleep(1.0)
print(cls.kit.editor.get_active_camera())
return camera
@classmethod
def fast_forward_sim(cls, in_frame_num = 120):
# force RayTracedLighting mode for better performance while simulating physics
cls.kit.set_setting("/rtx/rendermode", "RayTracedLighting")
# start simulation
cls.kit.play()
# Step simulation so that objects fall to rest
# wait until all materials are loaded
frame = 0
print("simulating physics...")
while frame < in_frame_num or cls.kit.is_loading():
cls.kit.update(1 / 60.0)
frame = frame + 1
print("done")
cls.kit.set_setting("/rtx/rendermode", CUSTOM_CONFIG["renderer"])
@classmethod
def capture_scene(cls):
print('Scene capturing...')
# Get groundtruth using glass material
ground_truth = cls.sd_helper.get_groundtruth(
[
"rgb",
"camera",
"depth",
"boundingBox2DTight",
"boundingBox2DLoose",
"instanceSegmentation",
"semanticSegmentation",
"boundingBox3D",
]
)
print('Scene capturing done!')
# everything is captured, stop simulating
cls.kit.stop()
# Save ground truth data locally as png
image_data = ut.to_numpy(ground_truth["rgb"])
rgb_img = Image.fromarray(image_data, "RGBA")
rgb_img.save('rgb.png')
return ground_truth
@classmethod
def spawn_blocks(cls):
stage = cls.kit.get_stage()
# Create 10 randomly positioned and coloured spheres and cube
# We will assign each a semantic label based on their shape (sphere/cube/cone)
prims = []
for i in range(10):
prim_type = random.choice(["Cube", "Sphere", "Cylinder", "Cone"])
prim = stage.DefinePrim(f"/World/{prim_type}{i}", prim_type)
translation = np.random.rand(3) * TRANSLATION_RANGE
UsdGeom.XformCommonAPI(prim).SetTranslate(translation.tolist())
UsdGeom.XformCommonAPI(prim).SetScale((SCALE, SCALE, SCALE))
# prim.GetAttribute("primvars:displayColor").Set([np.random.rand(3).tolist()])
# Add semantic label based on prim type
sem = Semantics.SemanticsAPI.Apply(prim, "Semantics")
sem.CreateSemanticTypeAttr()
sem.CreateSemanticDataAttr()
sem.GetSemanticTypeAttr().Set("class")
sem.GetSemanticDataAttr().Set(prim_type)
# Add physics to prims
utils.setRigidBody(prim, "convexHull", False)
# Set Mass to 1 kg
mass_api = PhysicsSchema.MassAPI.Apply(prim)
mass_api.CreateMassAttr(1)
# add prim reference to list
prims.append(prim)
# Apply glass material
for prim, i in zip(prims, range(len(prims))):
# Create Glass material
mtl_created_list = []
if (i%2 == 0):
omni.kit.commands.execute(
"CreateAndBindMdlMaterialFromLibrary",
mdl_name="OmniGlass.mdl",
mtl_name="OmniGlass",
mtl_created_list=mtl_created_list,
)
else:
# Create a new material using OmniPBR.mdl
omni.kit.commands.execute(
"CreateAndBindMdlMaterialFromLibrary",
mdl_name="OmniGlass_Opacity.mdl",
mtl_name="OmniGlass_Opacity",
mtl_created_list=mtl_created_list,
)
mtl_prim = stage.GetPrimAtPath(mtl_created_list[0])
if (i%2 == 0):
# Set material inputs, these can be determined by looking at the .mdl file
# or by selecting the Shader attached to the Material in the stage window and looking at the details panel
color = Gf.Vec3f(random.random(), random.random(), random.random())
omni.kit.usd.create_material_input(mtl_prim, "glass_color", color, Sdf.ValueTypeNames.Color3f)
omni.kit.usd.create_material_input(mtl_prim, "glass_ior", 1.25, Sdf.ValueTypeNames.Float)
# This value is the volumetric light absorption scale, reduce to zero to make glass clearer
omni.kit.usd.create_material_input(mtl_prim, "depth", 0.0005, Sdf.ValueTypeNames.Float)
# Enable for thin glass objects if needed
omni.kit.usd.create_material_input(mtl_prim, "thin_walled", False, Sdf.ValueTypeNames.Bool)
# Bind the material to the prim
prim_mat_shade = UsdShade.Material(mtl_prim)
UsdShade.MaterialBindingAPI(prim).Bind(prim_mat_shade, UsdShade.Tokens.strongerThanDescendants)
@classmethod
def do_task(cls):
cls.sd_helper = SyntheticDataHelper()
# Spawn a new camera and activate it
cls.spawn_camera('0_Camera')
# Spawn block objects
cls.spawn_blocks()
# Fast forward sim so that objects fall to rest
cls.fast_forward_sim()
# Capture the scene with the newly spawned camera
cls.capture_scene()
def main():
CustomKitHelper.kit = OmniKitHelper(CUSTOM_CONFIG)
CustomKitHelper.load_map('ISAAC_ASSETS/Environments/Simple_Warehouse/warehouse.usd')
# Fast forward Sim, which runs Kit update for the map loading call back to be triggered
CustomKitHelper.wait_for_sim_loading()
CustomKitHelper.fast_forward_sim(200)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment