Source code for isaaclab.scene.interactive_scene

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import logging
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from isaaclab_physx.assets import DeformableObject, SurfaceGripper

import torch
import warp as wp

from pxr import Sdf

import isaaclab.sim as sim_utils
from isaaclab import cloner
from isaaclab.assets import (
    Articulation,
    ArticulationCfg,
    AssetBaseCfg,
    RigidObject,
    RigidObjectCfg,
    RigidObjectCollection,
    RigidObjectCollectionCfg,
)
from isaaclab.physics.scene_data_requirements import resolve_scene_data_requirements
from isaaclab.sensors import ContactSensorCfg, FrameTransformerCfg, SensorBase, SensorBaseCfg
from isaaclab.sim import SimulationContext
from isaaclab.sim.utils.stage import get_current_stage, get_current_stage_id
from isaaclab.sim.views import XformPrimView
from isaaclab.terrains import TerrainImporter, TerrainImporterCfg

# Note: This is a temporary import for the VisuoTactileSensorCfg class.
# It will be removed once the VisuoTactileSensor class is added to the core Isaac Lab framework.
from isaaclab_contrib.sensors.tacsl_sensor import VisuoTactileSensorCfg

from .interactive_scene_cfg import InteractiveSceneCfg

# import logger
logger = logging.getLogger(__name__)


[docs] class InteractiveScene: """A scene that contains entities added to the simulation. The interactive scene parses the :class:`InteractiveSceneCfg` class to create the scene. Based on the specified number of environments, it clones the entities and groups them into different categories (e.g., articulations, sensors, etc.). Cloning can be performed in two ways: * For tasks where all environments contain the same assets, a more performant cloning paradigm can be used to allow for faster environment creation. This is specified by the ``replicate_physics`` flag. .. code-block:: python scene = InteractiveScene(cfg=InteractiveSceneCfg(replicate_physics=True)) * For tasks that require having separate assets in the environments, ``replicate_physics`` would have to be set to False, which will add some costs to the overall startup time. .. code-block:: python scene = InteractiveScene(cfg=InteractiveSceneCfg(replicate_physics=False)) Each entity is registered to scene based on its name in the configuration class. For example, if the user specifies a robot in the configuration class as follows: .. code-block:: python from isaaclab.scene import InteractiveSceneCfg from isaaclab.utils import configclass from isaaclab_assets.robots.anymal import ANYMAL_C_CFG @configclass class MySceneCfg(InteractiveSceneCfg): # ANYmal-C robot spawned in each environment robot = ANYMAL_C_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot") Then the robot can be accessed from the scene as follows: .. code-block:: python from isaaclab.scene import InteractiveScene # create 128 environments scene = InteractiveScene(cfg=MySceneCfg(num_envs=128)) # access the robot from the scene robot = scene["robot"] # access the robot based on its type robot = scene.articulations["robot"] If the :class:`InteractiveSceneCfg` class does not include asset entities, the cloning process can still be triggered if assets were added to the stage outside of the :class:`InteractiveScene` class: .. code-block:: python scene = InteractiveScene(cfg=InteractiveSceneCfg(num_envs=128, replicate_physics=True)) scene.clone_environments() .. note:: It is important to note that the scene only performs common operations on the entities. For example, resetting the internal buffers, writing the buffers to the simulation and updating the buffers from the simulation. The scene does not perform any task specific to the entity. For example, it does not apply actions to the robot or compute observations from the robot. These tasks are handled by different modules called "managers" in the framework. Please refer to the :mod:`isaaclab.managers` sub-package for more details. """
[docs] def __init__(self, cfg: InteractiveSceneCfg): """Initializes the scene. Args: cfg: The configuration class for the scene. """ # check that the config is valid cfg.validate() # store inputs self.cfg = cfg # initialize scene elements self._terrain = None self._articulations = dict() self._deformable_objects = dict() self._rigid_objects = dict() self._rigid_object_collections = dict() self._sensors = dict() self._surface_grippers = dict() self._extras = dict() # get stage handle self.sim = SimulationContext.instance() self.stage = get_current_stage() self.stage_id = get_current_stage_id() self.sim.clear_scene_data_visualizer_prebuilt_artifact() self.physics_backend = self.sim.physics_manager.__name__.lower() visualizer_clone_fn = None requested_viz_types = set(self.sim.resolve_visualizer_types()) if "physx" in self.physics_backend: from isaaclab_physx.cloner import physx_replicate physics_clone_fn = physx_replicate elif "newton" in self.physics_backend: from isaaclab_newton.cloner import newton_physics_replicate physics_clone_fn = newton_physics_replicate else: raise ValueError(f"Unsupported physics backend: {self.physics_backend}") # physics scene path self._physics_scene_path = None # prepare cloner for environment replication self.env_prim_paths = [f"{self.env_ns}/env_{i}" for i in range(self.cfg.num_envs)] self.cloner_cfg = cloner.TemplateCloneCfg( clone_regex=self.env_regex_ns, clone_in_fabric=self.cfg.clone_in_fabric, device=self.device, physics_clone_fn=physics_clone_fn, visualizer_clone_fn=None, ) # create source prim self.stage.DefinePrim(self.env_prim_paths[0], "Xform") self.stage.DefinePrim(self.cloner_cfg.template_root, "Xform") self.env_fmt = self.env_regex_ns.replace(".*", "{}") # allocate env indices self._ALL_INDICES = torch.arange(self.cfg.num_envs, dtype=torch.long, device=self.device) self._default_env_origins, _ = cloner.grid_transforms(self.num_envs, self.cfg.env_spacing, device=self.device) # copy empty prim of env_0 to env_1, env_2, ..., env_{num_envs-1} with correct location. cloner.usd_replicate( self.stage, [self.env_fmt.format(0)], [self.env_fmt], self._ALL_INDICES, positions=self._default_env_origins ) self._global_prim_paths = list() has_scene_cfg_entities = self._is_scene_setup_from_cfg() if has_scene_cfg_entities: self._add_entities_from_cfg() requirements = resolve_scene_data_requirements( visualizer_types=requested_viz_types, renderer_types=self._sensor_renderer_types(), ) self.sim.update_scene_data_requirements(requirements) visualizer_clone_fn = cloner.resolve_visualizer_clone_fn( physics_backend=self.physics_backend, requirements=requirements, stage=self.stage, set_visualizer_artifact=self.sim.set_scene_data_visualizer_prebuilt_artifact, ) if visualizer_clone_fn is not None: logger.debug( "Enabling visualizer artifact prebuild for clone path " "(backend=%s, requires_newton_model=%s, requires_usd_stage=%s).", self.physics_backend, requirements.requires_newton_model, requirements.requires_usd_stage, ) self.cloner_cfg.visualizer_clone_fn = visualizer_clone_fn if has_scene_cfg_entities: self.clone_environments(copy_from_source=(not self.cfg.replicate_physics)) # Collision filtering is PhysX-specific (PhysxSchema.PhysxSceneAPI) if self.cfg.filter_collisions and "physx" in self.physics_backend: self.filter_collisions(self._global_prim_paths)
[docs] def clone_environments(self, copy_from_source: bool = False): """Creates clones of the environment ``/World/envs/env_0``. Args: copy_from_source: (bool): If set to False, clones inherit from /World/envs/env_0 and mirror its changes. If True, clones are independent copies of the source prim and won't reflect its changes (start-up time may increase). Defaults to False. """ # PhysX-only: set env id bit count for replicated physics. Newton handles env separation in its own API. if self.cfg.replicate_physics and "physx" in self.physics_backend: prim = self.stage.GetPrimAtPath("/physicsScene") prim.CreateAttribute("physxScene:envIdInBoundsBitCount", Sdf.ValueTypeNames.Int).Set(4) if self._is_scene_setup_from_cfg(): self.cloner_cfg.clone_physics = not copy_from_source cloner.clone_from_template(self.stage, num_clones=self.num_envs, template_clone_cfg=self.cloner_cfg) else: mapping = torch.ones((1, self.num_envs), device=self.device, dtype=torch.bool) replicate_args = ( [self.env_fmt.format(0)], [self.env_fmt], self._ALL_INDICES, mapping, self._default_env_origins, ) if not copy_from_source: # skip physx cloning, this means physx will walk and parse the stage one by one faithfully self.cloner_cfg.physics_clone_fn(self.stage, *replicate_args, device=self.cloner_cfg.device) if self.cloner_cfg.visualizer_clone_fn is not None: self.cloner_cfg.visualizer_clone_fn(self.stage, *replicate_args, device=self.cloner_cfg.device) cloner.usd_replicate(self.stage, *replicate_args)
def _sensor_renderer_types(self) -> list[str]: """Return renderer type names used by scene sensors.""" renderer_types: list[str] = [] for sensor in self._sensors.values(): sensor_cfg = getattr(sensor, "cfg", None) renderer_cfg = getattr(sensor_cfg, "renderer_cfg", None) if renderer_cfg is None: continue renderer_type = getattr(renderer_cfg, "renderer_type", "default") renderer_types.append(renderer_type) return renderer_types
[docs] def filter_collisions(self, global_prim_paths: list[str] | None = None): """Filter environments collisions. Disables collisions between the environments in ``/World/envs/env_.*`` and enables collisions with the prims in global prim paths (e.g. ground plane). Args: global_prim_paths: A list of global prim paths to enable collisions with. Defaults to None, in which case no global prim paths are considered. """ # validate paths in global prim paths if global_prim_paths is None: global_prim_paths = [] else: # remove duplicates in paths global_prim_paths = list(set(global_prim_paths)) # if "/World/collisions" already exists in the stage, we don't filter again if self.stage.GetPrimAtPath("/World/collisions"): return # set global prim paths list if not previously defined if len(self._global_prim_paths) < 1: self._global_prim_paths += global_prim_paths # filter collisions within each environment instance cloner.filter_collisions( self.stage, self.physics_scene_path, "/World/collisions", self.env_prim_paths, global_paths=self._global_prim_paths, )
def __str__(self) -> str: """Returns a string representation of the scene.""" msg = f"<class {self.__class__.__name__}>\n" msg += f"\tNumber of environments: {self.cfg.num_envs}\n" msg += f"\tEnvironment spacing : {self.cfg.env_spacing}\n" msg += f"\tSource prim name : {self.env_prim_paths[0]}\n" msg += f"\tGlobal prim paths : {self._global_prim_paths}\n" msg += f"\tReplicate physics : {self.cfg.replicate_physics}" return msg """ Properties. """ @property def physics_scene_path(self) -> str: """The path to the USD Physics Scene.""" if self._physics_scene_path is None: for prim in self.stage.Traverse(): if "PhysxSceneAPI" in prim.GetAppliedSchemas(): self._physics_scene_path = prim.GetPrimPath().pathString logger.info(f"Physics scene prim path: {self._physics_scene_path}") break if self._physics_scene_path is None: raise RuntimeError("No physics scene found! Please make sure one exists.") return self._physics_scene_path @property def physics_dt(self) -> float: """The physics timestep of the scene.""" return sim_utils.SimulationContext.instance().get_physics_dt() # pyright: ignore [reportOptionalMemberAccess] @property def device(self) -> str: """The device on which the scene is created.""" return sim_utils.SimulationContext.instance().device # pyright: ignore [reportOptionalMemberAccess] @property def env_ns(self) -> str: """The namespace ``/World/envs`` in which all environments created. The environments are present w.r.t. this namespace under "env_{N}" prim, where N is a natural number. """ return "/World/envs" @property def env_regex_ns(self) -> str: """The namespace ``/World/envs/env_.*`` in which all environments created.""" return f"{self.env_ns}/env_.*" @property def num_envs(self) -> int: """The number of environments handled by the scene.""" return self.cfg.num_envs @property def env_origins(self) -> torch.Tensor: """The origins of the environments in the scene. Shape is (num_envs, 3).""" if self._terrain is not None: return self._terrain.env_origins else: return self._default_env_origins @property def terrain(self) -> TerrainImporter | None: """The terrain in the scene. If None, then the scene has no terrain. Note: We treat terrain separate from :attr:`extras` since terrains define environment origins and are handled differently from other miscellaneous entities. """ return self._terrain @property def articulations(self) -> dict[str, Articulation]: """A dictionary of articulations in the scene.""" return self._articulations @property def deformable_objects(self) -> dict[str, DeformableObject]: """A dictionary of deformable objects in the scene.""" return self._deformable_objects @property def rigid_objects(self) -> dict[str, RigidObject]: """A dictionary of rigid objects in the scene.""" return self._rigid_objects @property def rigid_object_collections(self) -> dict[str, RigidObjectCollection]: """A dictionary of rigid object collections in the scene.""" return self._rigid_object_collections @property def sensors(self) -> dict[str, SensorBase]: """A dictionary of the sensors in the scene, such as cameras and contact reporters.""" return self._sensors @property def surface_grippers(self) -> dict[str, SurfaceGripper]: """A dictionary of the surface grippers in the scene.""" return self._surface_grippers @property def extras(self) -> dict[str, XformPrimView]: """A dictionary of miscellaneous simulation objects that neither inherit from assets nor sensors. The keys are the names of the miscellaneous objects, and the values are the :class:`~isaaclab.sim.views.XformPrimView` instances of the corresponding prims. As an example, lights or other props in the scene that do not have any attributes or properties that you want to alter at runtime can be added to this dictionary. Note: These are not reset or updated by the scene. They are mainly other prims that are not necessarily handled by the interactive scene, but are useful to be accessed by the user. """ return self._extras @property def state(self) -> dict[str, dict[str, dict[str, torch.Tensor]]]: """A dictionary of the state of the scene entities in the simulation world frame. Please refer to :meth:`get_state` for the format. """ return self.get_state(is_relative=False) """ Operations. """
[docs] def reset(self, env_ids: Sequence[int] | None = None): """Resets the scene entities. Args: env_ids: The indices of the environments to reset. Defaults to None (all instances). """ # -- assets for articulation in self._articulations.values(): articulation.reset(env_ids) for deformable_object in self._deformable_objects.values(): deformable_object.reset(env_ids) for rigid_object in self._rigid_objects.values(): rigid_object.reset(env_ids) for surface_gripper in self._surface_grippers.values(): surface_gripper.reset(env_ids) for rigid_object_collection in self._rigid_object_collections.values(): rigid_object_collection.reset(env_ids) # -- sensors for sensor in self._sensors.values(): sensor.reset(env_ids)
[docs] def write_data_to_sim(self): """Writes the data of the scene entities to the simulation.""" # -- assets for articulation in self._articulations.values(): articulation.write_data_to_sim() for deformable_object in self._deformable_objects.values(): deformable_object.write_data_to_sim() for rigid_object in self._rigid_objects.values(): rigid_object.write_data_to_sim() for surface_gripper in self._surface_grippers.values(): surface_gripper.write_data_to_sim() for rigid_object_collection in self._rigid_object_collections.values(): rigid_object_collection.write_data_to_sim()
[docs] def update(self, dt: float) -> None: """Update the scene entities. Args: dt: The amount of time passed from last :meth:`update` call. """ # -- assets for articulation in self._articulations.values(): articulation.update(dt) for deformable_object in self._deformable_objects.values(): deformable_object.update(dt) for rigid_object in self._rigid_objects.values(): rigid_object.update(dt) for rigid_object_collection in self._rigid_object_collections.values(): rigid_object_collection.update(dt) for surface_gripper in self._surface_grippers.values(): surface_gripper.update(dt) # -- sensors for sensor in self._sensors.values(): sensor.update(dt, force_recompute=not self.cfg.lazy_sensor_update)
""" Operations: Scene State. """
[docs] def reset_to( self, state: dict[str, dict[str, dict[str, torch.Tensor]]], env_ids: Sequence[int] | None = None, is_relative: bool = False, ): """Resets the entities in the scene to the provided state. Args: state: The state to reset the scene entities to. Please refer to :meth:`get_state` for the format. env_ids: The indices of the environments to reset. Defaults to None, in which case all environment instances are reset. is_relative: If set to True, the state is considered relative to the environment origins. Defaults to False. """ # resolve env_ids if env_ids is None: env_ids = self._ALL_INDICES # articulations for asset_name, articulation in self._articulations.items(): asset_state = state["articulation"][asset_name] # root state root_pose = asset_state["root_pose"].clone().to(self.device) if is_relative: root_pose[:, :3] += self.env_origins[env_ids] root_velocity = asset_state["root_velocity"].clone().to(self.device) articulation.write_root_pose_to_sim_index(root_pose=root_pose, env_ids=env_ids) articulation.write_root_velocity_to_sim_index(root_velocity=root_velocity, env_ids=env_ids) # joint state joint_position = asset_state["joint_position"].clone().to(self.device) joint_velocity = asset_state["joint_velocity"].clone().to(self.device) articulation.write_joint_position_to_sim_index(position=joint_position, env_ids=env_ids) articulation.write_joint_velocity_to_sim_index(velocity=joint_velocity, env_ids=env_ids) # FIXME: This is not generic as it assumes PD control over the joints. # This assumption does not hold for effort controlled joints. articulation.set_joint_position_target_index(target=joint_position, env_ids=env_ids) articulation.set_joint_velocity_target_index(target=joint_velocity, env_ids=env_ids) # deformable objects for asset_name, deformable_object in self._deformable_objects.items(): asset_state = state["deformable_object"][asset_name] nodal_position = asset_state["nodal_position"].clone().to(self.device) if is_relative: nodal_position[:, :3] += self.env_origins[env_ids] nodal_velocity = asset_state["nodal_velocity"].clone().to(self.device) deformable_object.write_nodal_pos_to_sim(nodal_position, env_ids=env_ids) deformable_object.write_nodal_velocity_to_sim(nodal_velocity, env_ids=env_ids) # rigid objects for asset_name, rigid_object in self._rigid_objects.items(): asset_state = state["rigid_object"][asset_name] root_pose = asset_state["root_pose"].clone().to(self.device) if is_relative: root_pose[:, :3] += self.env_origins[env_ids] root_velocity = asset_state["root_velocity"].clone().to(self.device) rigid_object.write_root_pose_to_sim_index(root_pose=root_pose, env_ids=env_ids) rigid_object.write_root_velocity_to_sim_index(root_velocity=root_velocity, env_ids=env_ids) # surface grippers for asset_name, surface_gripper in self._surface_grippers.items(): asset_state = state["gripper"][asset_name] surface_gripper.set_grippers_command(asset_state) # write data to simulation to make sure initial state is set # this propagates the joint targets to the simulation self.write_data_to_sim()
[docs] def get_state(self, is_relative: bool = False) -> dict[str, dict[str, dict[str, torch.Tensor]]]: """Returns the state of the scene entities. Based on the type of the entity, the state comprises of different components. * For an articulation, the state comprises of the root pose, root velocity, and joint position and velocity. * For a deformable object, the state comprises of the nodal position and velocity. * For a rigid object, the state comprises of the root pose and root velocity. The returned state is a dictionary with the following format: .. code-block:: python { "articulation": { "entity_1_name": { "root_pose": torch.Tensor, "root_velocity": torch.Tensor, "joint_position": torch.Tensor, "joint_velocity": torch.Tensor, }, "entity_2_name": { "root_pose": torch.Tensor, "root_velocity": torch.Tensor, "joint_position": torch.Tensor, "joint_velocity": torch.Tensor, }, }, "deformable_object": { "entity_3_name": { "nodal_position": torch.Tensor, "nodal_velocity": torch.Tensor, } }, "rigid_object": { "entity_4_name": { "root_pose": torch.Tensor, "root_velocity": torch.Tensor, } }, } where ``entity_N_name`` is the name of the entity registered in the scene. Args: is_relative: If set to True, the state is considered relative to the environment origins. Defaults to False. Returns: A dictionary of the state of the scene entities. """ state = dict() # articulations state["articulation"] = dict() for asset_name, articulation in self._articulations.items(): asset_state = dict() asset_state["root_pose"] = wp.to_torch(articulation.data.root_pose_w).clone() if is_relative: asset_state["root_pose"][:, :3] -= self.env_origins asset_state["root_velocity"] = wp.to_torch(articulation.data.root_vel_w).clone() asset_state["joint_position"] = wp.to_torch(articulation.data.joint_pos).clone() asset_state["joint_velocity"] = wp.to_torch(articulation.data.joint_vel).clone() state["articulation"][asset_name] = asset_state # deformable objects state["deformable_object"] = dict() for asset_name, deformable_object in self._deformable_objects.items(): asset_state = dict() asset_state["nodal_position"] = wp.to_torch(deformable_object.data.nodal_pos_w).clone() if is_relative: asset_state["nodal_position"][:, :3] -= self.env_origins asset_state["nodal_velocity"] = wp.to_torch(deformable_object.data.nodal_vel_w).clone() state["deformable_object"][asset_name] = asset_state # rigid objects state["rigid_object"] = dict() for asset_name, rigid_object in self._rigid_objects.items(): asset_state = dict() asset_state["root_pose"] = wp.to_torch(rigid_object.data.root_pose_w).clone() if is_relative: asset_state["root_pose"][:, :3] -= self.env_origins asset_state["root_velocity"] = wp.to_torch(rigid_object.data.root_vel_w).clone() state["rigid_object"][asset_name] = asset_state # surface grippers state["gripper"] = dict() for asset_name, gripper in self._surface_grippers.items(): state["gripper"][asset_name] = wp.to_torch(gripper.state).clone() return state
""" Operations: Iteration. """
[docs] def keys(self) -> list[str]: """Returns the keys of the scene entities. Returns: The keys of the scene entities. """ all_keys = ["terrain"] for asset_family in [ self._articulations, self._deformable_objects, self._rigid_objects, self._rigid_object_collections, self._sensors, self._surface_grippers, self._extras, ]: all_keys += list(asset_family.keys()) return all_keys
def __getitem__(self, key: str) -> Any: """Returns the scene entity with the given key. Args: key: The key of the scene entity. Returns: The scene entity. """ # check if it is a terrain if key == "terrain": return self._terrain all_keys = ["terrain"] # check if it is in other dictionaries for asset_family in [ self._articulations, self._deformable_objects, self._rigid_objects, self._rigid_object_collections, self._sensors, self._surface_grippers, self._extras, ]: out = asset_family.get(key) # if found, return if out is not None: return out all_keys += list(asset_family.keys()) # if not found, raise error raise KeyError(f"Scene entity with key '{key}' not found. Available Entities: '{all_keys}'") """ Internal methods. """ def _is_scene_setup_from_cfg(self) -> bool: """Check if scene entities are setup from the config or not. Returns: True if scene entities are setup from the config, False otherwise. """ return any( not (asset_name in InteractiveSceneCfg.__dataclass_fields__ or asset_cfg is None) for asset_name, asset_cfg in self.cfg.__dict__.items() ) def _add_entities_from_cfg(self): # noqa: C901 """Add scene entities from the config.""" from isaaclab_physx.assets import DeformableObjectCfg, SurfaceGripperCfg # noqa: PLC0415 # store paths that are in global collision filter self._global_prim_paths = list() # Process non-sensor entities before sensors so that asset prims exist in the template # when sensors (e.g. cameras attached to robot links) need to spawn under them. all_items = [ (k, v) for k, v in self.cfg.__dict__.items() if k not in InteractiveSceneCfg.__dataclass_fields__ and v is not None ] ordered_items = [(k, v) for k, v in all_items if not isinstance(v, SensorBaseCfg)] + [ (k, v) for k, v in all_items if isinstance(v, SensorBaseCfg) ] for asset_name, asset_cfg in ordered_items: # Resolve old-style preset wrappers: configclass with a ``presets`` dict and a ``'default'`` key. # These are multi-backend selector objects (e.g. VelocityEnvContactSensorCfg) that hold several # alternative asset configs in a dict and are not themselves asset configs. if hasattr(asset_cfg, "presets") and isinstance(asset_cfg.presets, dict) and "default" in asset_cfg.presets: asset_cfg = asset_cfg.presets["default"] setattr(self.cfg, asset_name, asset_cfg) # resolve prim_path with env regex if hasattr(asset_cfg, "prim_path"): asset_cfg.prim_path = asset_cfg.prim_path.format(ENV_REGEX_NS=self.env_regex_ns) # set spawn_path on spawner if cloning is needed if hasattr(asset_cfg, "spawn") and asset_cfg.spawn is not None: if hasattr(asset_cfg, "prim_path") and self.env_ns in asset_cfg.prim_path: template_base = asset_cfg.prim_path.replace(self.env_regex_ns, self.cloner_cfg.template_root) proto_id = self.cloner_cfg.template_prototype_identifier if isinstance(asset_cfg, SensorBaseCfg): # Sensor may be nested under a proto_asset_N prim (e.g. a camera on a robot # link). Search for the actual template location so spawning succeeds even # though the parent asset lives at template_root/<Asset>/proto_asset_0/... asset_cfg.spawn.spawn_path = self._resolve_sensor_template_spawn_path(template_base, proto_id) else: asset_cfg.spawn.spawn_path = f"{template_base}/{proto_id}_.*" else: # No cloning - spawn directly at prim_path asset_cfg.spawn.spawn_path = asset_cfg.prim_path # create asset if isinstance(asset_cfg, TerrainImporterCfg): # terrains are special entities since they define environment origins asset_cfg.num_envs = self.cfg.num_envs asset_cfg.env_spacing = self.cfg.env_spacing self._terrain = asset_cfg.class_type(asset_cfg) elif isinstance(asset_cfg, ArticulationCfg): self._articulations[asset_name] = asset_cfg.class_type(asset_cfg) elif isinstance(asset_cfg, DeformableObjectCfg): self._deformable_objects[asset_name] = asset_cfg.class_type(asset_cfg) elif isinstance(asset_cfg, RigidObjectCfg): self._rigid_objects[asset_name] = asset_cfg.class_type(asset_cfg) elif isinstance(asset_cfg, RigidObjectCollectionCfg): for rigid_object_cfg in asset_cfg.rigid_objects.values(): rigid_object_cfg.prim_path = rigid_object_cfg.prim_path.format(ENV_REGEX_NS=self.env_regex_ns) # set spawn_path on spawner if cloning is needed if hasattr(rigid_object_cfg, "spawn") and rigid_object_cfg.spawn is not None: if self.env_ns in rigid_object_cfg.prim_path: spawn_tmpl = rigid_object_cfg.prim_path.replace( self.env_regex_ns, self.cloner_cfg.template_root ) proto_id = self.cloner_cfg.template_prototype_identifier rigid_object_cfg.spawn.spawn_path = f"{spawn_tmpl}/{proto_id}_.*" else: rigid_object_cfg.spawn.spawn_path = rigid_object_cfg.prim_path self._rigid_object_collections[asset_name] = asset_cfg.class_type(asset_cfg) for rigid_object_cfg in asset_cfg.rigid_objects.values(): if hasattr(rigid_object_cfg, "collision_group") and rigid_object_cfg.collision_group == -1: asset_paths = sim_utils.find_matching_prim_paths(rigid_object_cfg.prim_path) self._global_prim_paths += asset_paths elif isinstance(asset_cfg, SurfaceGripperCfg): # add surface grippers to scene self._surface_grippers[asset_name] = asset_cfg.class_type(asset_cfg) elif isinstance(asset_cfg, SensorBaseCfg): # Update target frame path(s)' regex name space for FrameTransformer if isinstance(asset_cfg, FrameTransformerCfg): updated_target_frames = [] for target_frame in asset_cfg.target_frames: target_frame.prim_path = target_frame.prim_path.format(ENV_REGEX_NS=self.env_regex_ns) updated_target_frames.append(target_frame) asset_cfg.target_frames = updated_target_frames elif isinstance(asset_cfg, ContactSensorCfg): asset_cfg.filter_prim_paths_expr = [ p.format(ENV_REGEX_NS=self.env_regex_ns) for p in asset_cfg.filter_prim_paths_expr ] elif isinstance(asset_cfg, VisuoTactileSensorCfg): if hasattr(asset_cfg, "camera_cfg") and asset_cfg.camera_cfg is not None: asset_cfg.camera_cfg.prim_path = asset_cfg.camera_cfg.prim_path.format( ENV_REGEX_NS=self.env_regex_ns ) if ( hasattr(asset_cfg, "contact_object_prim_path_expr") and asset_cfg.contact_object_prim_path_expr is not None ): asset_cfg.contact_object_prim_path_expr = asset_cfg.contact_object_prim_path_expr.format( ENV_REGEX_NS=self.env_regex_ns ) self._sensors[asset_name] = asset_cfg.class_type(asset_cfg) elif isinstance(asset_cfg, AssetBaseCfg): # manually spawn asset if asset_cfg.spawn is not None: asset_cfg.spawn.func( asset_cfg.spawn.spawn_path, asset_cfg.spawn, translation=asset_cfg.init_state.pos, orientation=asset_cfg.init_state.rot, ) # store xform prim view corresponding to this asset # all prims in the scene are Xform prims (i.e. have a transform component) self._extras[asset_name] = XformPrimView(asset_cfg.prim_path, device=self.device, stage=self.stage) else: raise ValueError(f"Unknown asset config type for {asset_name}: {asset_cfg}") # store global collision paths if hasattr(asset_cfg, "collision_group") and asset_cfg.collision_group == -1: asset_paths = sim_utils.find_matching_prim_paths(asset_cfg.prim_path) self._global_prim_paths += asset_paths def _resolve_sensor_template_spawn_path(self, template_base: str, proto_id: str) -> str: """Resolve the actual template spawn path for a sensor nested under a proto_asset prim. Sensors parented to robot links live inside ``proto_asset_0`` rather than directly under the template root. For example, a wrist camera at ``/World/template/Robot/panda_hand/wrist_cam`` is actually spawned at ``/World/template/Robot/proto_asset_0/panda_hand/wrist_cam``. This method inserts a ``proto_id_.*`` wildcard one level below the template root and searches for the concrete parent prim so the camera spawner can find it. Args: template_base: Template path derived by replacing the env regex with the template root. Example: ``/World/template/Robot/panda_hand/wrist_cam``. proto_id: Prototype identifier prefix (e.g. ``proto_asset``). Returns: Concrete spawn path (e.g. ``/World/template/Robot/proto_asset_0/panda_hand/wrist_cam``) if the parent is found, otherwise ``template_base/proto_id_.*`` as a fallback. """ template_root = self.cloner_cfg.template_root # rel = e.g. "Robot/panda_hand/wrist_cam" rel = template_base[len(template_root) + 1 :] # asset = "Robot", remainder = "panda_hand/wrist_cam" asset, _, remainder = rel.partition("/") if not remainder: return f"{template_base}/{proto_id}_.*" # parent = "panda_hand", leaf = "wrist_cam" parent, _, leaf = remainder.rpartition("/") search = ( f"{template_root}/{asset}/{proto_id}_.*/{parent}" if parent else f"{template_root}/{asset}/{proto_id}_.*" ) found = sim_utils.find_matching_prim_paths(search) return f"{found[0]}/{leaf}" if found else f"{template_base}/{proto_id}_.*"