Source code for isaaclab.sim.views.xform_prim_view

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import logging
from collections.abc import Sequence

import numpy as np
import torch
import warp as wp

import carb
from pxr import Gf, Sdf, Usd, UsdGeom, Vt

import isaaclab.sim as sim_utils
import isaaclab.utils.math as math_utils
from isaaclab.utils.warp import fabric as fabric_utils

logger = logging.getLogger(__name__)


[docs]class XformPrimView: """Optimized batched interface for reading and writing transforms of multiple USD prims. This class provides efficient batch operations for getting and setting poses (position and orientation) of multiple prims at once using torch tensors. It is designed for scenarios where you need to manipulate many prims simultaneously, such as in multi-agent simulations or large-scale procedural generation. The class supports both world-space and local-space pose operations: - **World poses**: Positions and orientations in the global world frame - **Local poses**: Positions and orientations relative to each prim's parent When Fabric is enabled, the class leverages NVIDIA's Fabric API for GPU-accelerated batch operations: - Uses `omni:fabric:worldMatrix` and `omni:fabric:localMatrix` attributes for all Boundable prims - Performs batch matrix decomposition/composition using Warp kernels on GPU - Achieves performance comparable to Isaac Sim's XFormPrim implementation - Works for both physics-enabled and non-physics prims (cameras, meshes, etc.). Note: renderers typically consume USD-authored camera transforms. .. warning:: **Fabric requires CUDA**: Fabric is only supported with on CUDA devices. Warp's CPU backend for fabric-array writes has known issues, so attempting to use Fabric with CPU device (``device="cpu"``) will raise a ValueError at initialization. .. note:: **Fabric Support:** When Fabric is enabled, this view ensures prims have the required Fabric hierarchy attributes (``omni:fabric:localMatrix`` and ``omni:fabric:worldMatrix``). On first Fabric read, USD-authored transforms initialize Fabric state. Fabric writes can optionally be mirrored back to USD via :attr:`sync_usd_on_fabric_write`. For more information, see the `Fabric Hierarchy documentation`_. .. _Fabric Hierarchy documentation: https://docs.omniverse.nvidia.com/kit/docs/usdrt/latest/docs/fabric_hierarchy.html .. note:: **Performance Considerations:** * Tensor operations are performed on the specified device (CPU/CUDA) * USD write operations use ``Sdf.ChangeBlock`` for batched updates * Fabric operations use GPU-accelerated Warp kernels for maximum performance * For maximum performance, minimize get/set operations within tight loops .. note:: **Transform Requirements:** All prims in the view must be Xformable and have standardized transform operations: ``[translate, orient, scale]``. Non-standard prims will raise a ValueError during initialization if :attr:`validate_xform_ops` is True. Please use the function :func:`isaaclab.sim.utils.standardize_xform_ops` to prepare prims before using this view. .. warning:: This class operates at the USD default time code. Any animation or time-sampled data will not be affected by write operations. For animated transforms, you need to handle time-sampled keyframes separately. """
[docs] def __init__( self, prim_path: str, device: str = "cpu", validate_xform_ops: bool = True, sync_usd_on_fabric_write: bool = False, stage: Usd.Stage | None = None, ): """Initialize the view with matching prims. This method searches the USD stage for all prims matching the provided path pattern, validates that they are Xformable with standard transform operations, and stores references for efficient batch operations. We generally recommend to validate the xform operations, as it ensures that the prims are in a consistent state and have the standard transform operations (translate, orient, scale in that order). However, if you are sure that the prims are in a consistent state, you can set this to False to improve performance. This can save around 45-50% of the time taken to initialize the view. Args: prim_path: USD prim path pattern to match prims. Supports wildcards (``*``) and regex patterns (e.g., ``"/World/Env_.*/Robot"``). See :func:`isaaclab.sim.utils.find_matching_prims` for pattern syntax. device: Device to place the tensors on. Can be ``"cpu"`` or CUDA devices like ``"cuda:0"``. Defaults to ``"cpu"``. validate_xform_ops: Whether to validate that the prims have standard xform operations. Defaults to True. sync_usd_on_fabric_write: Whether to mirror Fabric transform writes back to USD. When True, transform updates are synchronized to USD so that USD data readers (e.g., rendering cameras) can observe these changes. Defaults to False for better performance. stage: USD stage to search for prims. Defaults to None, in which case the current active stage from the simulation context is used. Raises: ValueError: If any matched prim is not Xformable or doesn't have standardized transform operations (translate, orient, scale in that order). """ # Store configuration self._prim_path = prim_path self._device = device # Find and validate matching prims stage = sim_utils.get_current_stage() if stage is None else stage self._prims: list[Usd.Prim] = sim_utils.find_matching_prims(prim_path, stage=stage) # Validate all prims have standard xform operations if validate_xform_ops: for prim in self._prims: if not sim_utils.validate_standard_xform_ops(prim): raise ValueError( f"Prim at path '{prim.GetPath().pathString}' is not a xformable prim with standard transform" f" operations [translate, orient, scale]. Received type: '{prim.GetTypeName()}'." " Use sim_utils.standardize_xform_ops() to prepare the prim." ) # Determine if Fabric is supported on the device self._use_fabric = carb.settings.get_settings().get("/physics/fabricEnabled") logger.debug(f"Using Fabric for the XFormPrimView over '{self._prim_path}' on device '{self._device}'.") # Check for unsupported Fabric + CPU combination if self._use_fabric and self._device == "cpu": logger.warning( "Fabric mode with Warp fabric-array operations is not supported on CPU devices. " "While Fabric itself can run on both CPU and GPU, our batch Warp kernels for " "fabric-array operations require CUDA and are not reliable on the CPU backend. " "To ensure stability, Fabric is being disabled and execution will fall back " "to standard USD operations on the CPU. This may impact performance." ) self._use_fabric = False # Create indices buffer # Since we iterate over the indices, we need to use range instead of torch tensor self._ALL_INDICES = list(range(len(self._prims))) # Some prims (e.g., Cameras) require USD-authored transforms for rendering. # When enabled, mirror Fabric pose writes to USD for those prims. self._sync_usd_on_fabric_write = sync_usd_on_fabric_write # Fabric batch infrastructure (initialized lazily on first use) self._fabric_initialized = False self._fabric_usd_sync_done = False self._fabric_selection = None self._fabric_to_view: wp.array | None = None self._view_to_fabric: wp.array | None = None self._default_view_indices: wp.array | None = None self._fabric_hierarchy = None # Create a valid USD attribute name: namespace:name # Use "isaaclab" namespace to identify our custom attributes self._view_index_attr = f"isaaclab:view_index:{abs(hash(self))}"
""" Properties. """ @property def count(self) -> int: """Number of prims in this view.""" return len(self._prims) @property def device(self) -> str: """Device where tensors are allocated (cpu or cuda).""" return self._device @property def prims(self) -> list[Usd.Prim]: """List of USD prims being managed by this view.""" return self._prims @property def prim_paths(self) -> list[str]: """List of prim paths (as strings) for all prims being managed by this view. This property converts each prim to its path string representation. The conversion is performed lazily on first access and cached for subsequent accesses. Note: For most use cases, prefer using :attr:`prims` directly as it provides direct access to the USD prim objects without the conversion overhead. This property is mainly useful for logging, debugging, or when string paths are explicitly required. """ # we cache it the first time it is accessed. # we don't compute it in constructor because it is expensive and we don't need it most of the time. # users should usually deal with prims directly as they typically need to access the prims directly. if not hasattr(self, "_prim_paths"): self._prim_paths = [prim.GetPath().pathString for prim in self._prims] return self._prim_paths """ Operations - Setters. """
[docs] def set_world_poses( self, positions: torch.Tensor | None = None, orientations: torch.Tensor | None = None, indices: Sequence[int] | None = None, ): """Set world-space poses for prims in the view. This method sets the position and/or orientation of each prim in world space. - When Fabric is enabled, the function writes directly to Fabric's ``omni:fabric:worldMatrix`` attribute using GPU-accelerated batch operations. - When Fabric is disabled, the function converts to local space and writes to USD's ``xformOp:translate`` and ``xformOp:orient`` attributes. Args: positions: World-space positions as a tensor of shape (M, 3) where M is the number of prims to set (either all prims if indices is None, or the number of indices provided). Defaults to None, in which case positions are not modified. orientations: World-space orientations as quaternions (w, x, y, z) with shape (M, 4). Defaults to None, in which case orientations are not modified. indices: Indices of prims to set poses for. Defaults to None, in which case poses are set for all prims in the view. Raises: ValueError: If positions shape is not (M, 3) or orientations shape is not (M, 4). ValueError: If the number of poses doesn't match the number of indices provided. """ if self._use_fabric: self._set_world_poses_fabric(positions, orientations, indices) else: self._set_world_poses_usd(positions, orientations, indices)
[docs] def set_local_poses( self, translations: torch.Tensor | None = None, orientations: torch.Tensor | None = None, indices: Sequence[int] | None = None, ): """Set local-space poses for prims in the view. This method sets the position and/or orientation of each prim in local space (relative to their parent prims). The function writes directly to USD's ``xformOp:translate`` and ``xformOp:orient`` attributes. Note: Even in Fabric mode, local pose operations use USD. This behavior is based on Isaac Sim's design where Fabric is only used for world pose operations. Rationale: - Local pose writes need correct parent-child hierarchy relationships - USD maintains these relationships correctly and efficiently - Fabric is optimized for world pose operations, not local hierarchies Args: translations: Local-space translations as a tensor of shape (M, 3) where M is the number of prims to set (either all prims if indices is None, or the number of indices provided). Defaults to None, in which case translations are not modified. orientations: Local-space orientations as quaternions (w, x, y, z) with shape (M, 4). Defaults to None, in which case orientations are not modified. indices: Indices of prims to set poses for. Defaults to None, in which case poses are set for all prims in the view. Raises: ValueError: If translations shape is not (M, 3) or orientations shape is not (M, 4). ValueError: If the number of poses doesn't match the number of indices provided. """ if self._use_fabric: self._set_local_poses_fabric(translations, orientations, indices) else: self._set_local_poses_usd(translations, orientations, indices)
[docs] def set_scales(self, scales: torch.Tensor, indices: Sequence[int] | None = None): """Set scales for prims in the view. This method sets the scale of each prim in the view. - When Fabric is enabled, the function updates scales in Fabric matrices using GPU-accelerated batch operations. - When Fabric is disabled, the function writes to USD's ``xformOp:scale`` attributes. Args: scales: Scales as a tensor of shape (M, 3) where M is the number of prims to set (either all prims if indices is None, or the number of indices provided). indices: Indices of prims to set scales for. Defaults to None, in which case scales are set for all prims in the view. Raises: ValueError: If scales shape is not (M, 3). """ if self._use_fabric: self._set_scales_fabric(scales, indices) else: self._set_scales_usd(scales, indices)
[docs] def set_visibility(self, visibility: torch.Tensor, indices: Sequence[int] | None = None): """Set visibility for prims in the view. This method sets the visibility of each prim in the view. Args: visibility: Visibility as a boolean tensor of shape (M,) where M is the number of prims to set (either all prims if indices is None, or the number of indices provided). indices: Indices of prims to set visibility for. Defaults to None, in which case visibility is set for all prims in the view. Raises: ValueError: If visibility shape is not (M,). """ # Resolve indices if indices is None or indices == slice(None): indices_list = self._ALL_INDICES else: indices_list = indices.tolist() if isinstance(indices, torch.Tensor) else list(indices) # Validate inputs if visibility.shape != (len(indices_list),): raise ValueError(f"Expected visibility shape ({len(indices_list)},), got {visibility.shape}.") # Set visibility for each prim with Sdf.ChangeBlock(): for idx, prim_idx in enumerate(indices_list): # Convert prim to imageable imageable = UsdGeom.Imageable(self._prims[prim_idx]) # Set visibility if visibility[idx]: imageable.MakeVisible() else: imageable.MakeInvisible()
""" Operations - Getters. """
[docs] def get_world_poses(self, indices: Sequence[int] | None = None) -> tuple[torch.Tensor, torch.Tensor]: """Get world-space poses for prims in the view. This method retrieves the position and orientation of each prim in world space by computing the full transform hierarchy from the prim to the world root. - When Fabric is enabled, the function uses Fabric batch operations with Warp kernels. - When Fabric is disabled, the function uses USD XformCache. Note: Scale and skew are ignored. The returned poses contain only translation and rotation. Args: indices: Indices of prims to get poses for. Defaults to None, in which case poses are retrieved for all prims in the view. Returns: A tuple of (positions, orientations) where: - positions: Torch tensor of shape (M, 3) containing world-space positions (x, y, z), where M is the number of prims queried. - orientations: Torch tensor of shape (M, 4) containing world-space quaternions (w, x, y, z) """ if self._use_fabric: return self._get_world_poses_fabric(indices) else: return self._get_world_poses_usd(indices)
[docs] def get_local_poses(self, indices: Sequence[int] | None = None) -> tuple[torch.Tensor, torch.Tensor]: """Get local-space poses for prims in the view. This method retrieves the position and orientation of each prim in local space (relative to their parent prims). It reads directly from USD's ``xformOp:translate`` and ``xformOp:orient`` attributes. Note: Even in Fabric mode, local pose operations use USD. This behavior is based on Isaac Sim's design where Fabric is only used for world pose operations. Rationale: - Local pose reads need correct parent-child hierarchy relationships - USD maintains these relationships correctly and efficiently - Fabric is optimized for world pose operations, not local hierarchies Note: Scale is ignored. The returned poses contain only translation and rotation. Args: indices: Indices of prims to get poses for. Defaults to None, in which case poses are retrieved for all prims in the view. Returns: A tuple of (translations, orientations) where: - translations: Torch tensor of shape (M, 3) containing local-space translations (x, y, z), where M is the number of prims queried. - orientations: Torch tensor of shape (M, 4) containing local-space quaternions (w, x, y, z) """ if self._use_fabric: return self._get_local_poses_fabric(indices) else: return self._get_local_poses_usd(indices)
[docs] def get_scales(self, indices: Sequence[int] | None = None) -> torch.Tensor: """Get scales for prims in the view. This method retrieves the scale of each prim in the view. - When Fabric is enabled, the function extracts scales from Fabric matrices using batch operations with Warp kernels. - When Fabric is disabled, the function reads from USD's ``xformOp:scale`` attributes. Args: indices: Indices of prims to get scales for. Defaults to None, in which case scales are retrieved for all prims in the view. Returns: A tensor of shape (M, 3) containing the scales of each prim, where M is the number of prims queried. """ if self._use_fabric: return self._get_scales_fabric(indices) else: return self._get_scales_usd(indices)
[docs] def get_visibility(self, indices: Sequence[int] | None = None) -> torch.Tensor: """Get visibility for prims in the view. This method retrieves the visibility of each prim in the view. Args: indices: Indices of prims to get visibility for. Defaults to None, in which case visibility is retrieved for all prims in the view. Returns: A tensor of shape (M,) containing the visibility of each prim, where M is the number of prims queried. The tensor is of type bool. """ # Resolve indices if indices is None or indices == slice(None): indices_list = self._ALL_INDICES else: # Convert to list if it is a tensor array indices_list = indices.tolist() if isinstance(indices, torch.Tensor) else list(indices) # Create buffers visibility = torch.zeros(len(indices_list), dtype=torch.bool, device=self._device) for idx, prim_idx in enumerate(indices_list): # Get prim imageable = UsdGeom.Imageable(self._prims[prim_idx]) # Get visibility visibility[idx] = imageable.ComputeVisibility() != UsdGeom.Tokens.invisible return visibility
""" Internal Functions - USD. """ def _set_world_poses_usd( self, positions: torch.Tensor | None = None, orientations: torch.Tensor | None = None, indices: Sequence[int] | None = None, ): """Set world poses to USD.""" # Resolve indices if indices is None or indices == slice(None): indices_list = self._ALL_INDICES else: # Convert to list if it is a tensor array indices_list = indices.tolist() if isinstance(indices, torch.Tensor) else list(indices) # Validate inputs if positions is not None: if positions.shape != (len(indices_list), 3): raise ValueError( f"Expected positions shape ({len(indices_list)}, 3), got {positions.shape}. " "Number of positions must match the number of prims in the view." ) positions_array = Vt.Vec3dArray.FromNumpy(positions.cpu().numpy()) else: positions_array = None if orientations is not None: if orientations.shape != (len(indices_list), 4): raise ValueError( f"Expected orientations shape ({len(indices_list)}, 4), got {orientations.shape}. " "Number of orientations must match the number of prims in the view." ) # Vt expects quaternions in xyzw order orientations_array = Vt.QuatdArray.FromNumpy(math_utils.convert_quat(orientations, to="xyzw").cpu().numpy()) else: orientations_array = None # Create xform cache instance xform_cache = UsdGeom.XformCache(Usd.TimeCode.Default()) # Set poses for each prim # We use Sdf.ChangeBlock to minimize notification overhead. with Sdf.ChangeBlock(): for idx, prim_idx in enumerate(indices_list): # Get prim prim = self._prims[prim_idx] # Get parent prim for local space conversion parent_prim = prim.GetParent() # Determine what to set world_pos = positions_array[idx] if positions_array is not None else None world_quat = orientations_array[idx] if orientations_array is not None else None # Convert world pose to local if we have a valid parent # Note: We don't use :func:`isaaclab.sim.utils.transforms.convert_world_pose_to_local` # here since it isn't optimized for batch operations. if parent_prim.IsValid() and parent_prim.GetPath() != Sdf.Path.absoluteRootPath: # Get current world pose if we're only setting one component if positions_array is None or orientations_array is None: # get prim xform prim_tf = xform_cache.GetLocalToWorldTransform(prim) # sanitize quaternion # this is needed, otherwise the quaternion might be non-normalized prim_tf.Orthonormalize() # populate desired world transform if world_pos is not None: prim_tf.SetTranslateOnly(world_pos) if world_quat is not None: prim_tf.SetRotateOnly(world_quat) else: # Both position and orientation are provided, create new transform prim_tf = Gf.Matrix4d() prim_tf.SetTranslateOnly(world_pos) prim_tf.SetRotateOnly(world_quat) # Convert to local space parent_world_tf = xform_cache.GetLocalToWorldTransform(parent_prim) local_tf = prim_tf * parent_world_tf.GetInverse() local_pos = local_tf.ExtractTranslation() local_quat = local_tf.ExtractRotationQuat() else: # No parent or parent is root, world == local local_pos = world_pos local_quat = world_quat # Get or create the standard transform operations if local_pos is not None: prim.GetAttribute("xformOp:translate").Set(local_pos) if local_quat is not None: prim.GetAttribute("xformOp:orient").Set(local_quat) def _set_local_poses_usd( self, translations: torch.Tensor | None = None, orientations: torch.Tensor | None = None, indices: Sequence[int] | None = None, ): """Set local poses to USD.""" # Resolve indices if indices is None or indices == slice(None): indices_list = self._ALL_INDICES else: indices_list = indices.tolist() if isinstance(indices, torch.Tensor) else list(indices) # Validate inputs if translations is not None: if translations.shape != (len(indices_list), 3): raise ValueError(f"Expected translations shape ({len(indices_list)}, 3), got {translations.shape}.") translations_array = Vt.Vec3dArray.FromNumpy(translations.cpu().numpy()) else: translations_array = None if orientations is not None: if orientations.shape != (len(indices_list), 4): raise ValueError(f"Expected orientations shape ({len(indices_list)}, 4), got {orientations.shape}.") orientations_array = Vt.QuatdArray.FromNumpy(math_utils.convert_quat(orientations, to="xyzw").cpu().numpy()) else: orientations_array = None # Set local poses with Sdf.ChangeBlock(): for idx, prim_idx in enumerate(indices_list): prim = self._prims[prim_idx] if translations_array is not None: prim.GetAttribute("xformOp:translate").Set(translations_array[idx]) if orientations_array is not None: prim.GetAttribute("xformOp:orient").Set(orientations_array[idx]) def _set_scales_usd(self, scales: torch.Tensor, indices: Sequence[int] | None = None): """Set scales to USD.""" # Resolve indices if indices is None or indices == slice(None): indices_list = self._ALL_INDICES else: indices_list = indices.tolist() if isinstance(indices, torch.Tensor) else list(indices) # Validate inputs if scales.shape != (len(indices_list), 3): raise ValueError(f"Expected scales shape ({len(indices_list)}, 3), got {scales.shape}.") scales_array = Vt.Vec3dArray.FromNumpy(scales.cpu().numpy()) # Set scales for each prim with Sdf.ChangeBlock(): for idx, prim_idx in enumerate(indices_list): prim = self._prims[prim_idx] prim.GetAttribute("xformOp:scale").Set(scales_array[idx]) def _get_world_poses_usd(self, indices: Sequence[int] | None = None) -> tuple[torch.Tensor, torch.Tensor]: """Get world poses from USD.""" # Resolve indices if indices is None or indices == slice(None): indices_list = self._ALL_INDICES else: # Convert to list if it is a tensor array indices_list = indices.tolist() if isinstance(indices, torch.Tensor) else list(indices) # Create buffers positions = Vt.Vec3dArray(len(indices_list)) orientations = Vt.QuatdArray(len(indices_list)) # Create xform cache instance xform_cache = UsdGeom.XformCache(Usd.TimeCode.Default()) # Note: We don't use :func:`isaaclab.sim.utils.transforms.resolve_prim_pose` # here since it isn't optimized for batch operations. for idx, prim_idx in enumerate(indices_list): # Get prim prim = self._prims[prim_idx] # get prim xform prim_tf = xform_cache.GetLocalToWorldTransform(prim) # sanitize quaternion # this is needed, otherwise the quaternion might be non-normalized prim_tf.Orthonormalize() # extract position and orientation positions[idx] = prim_tf.ExtractTranslation() orientations[idx] = prim_tf.ExtractRotationQuat() # move to torch tensors positions = torch.tensor(np.array(positions), dtype=torch.float32, device=self._device) orientations = torch.tensor(np.array(orientations), dtype=torch.float32, device=self._device) # underlying data is in xyzw order, convert to wxyz order orientations = math_utils.convert_quat(orientations, to="wxyz") return positions, orientations # type: ignore def _get_local_poses_usd(self, indices: Sequence[int] | None = None) -> tuple[torch.Tensor, torch.Tensor]: """Get local poses from USD.""" # Resolve indices if indices is None or indices == slice(None): indices_list = self._ALL_INDICES else: indices_list = indices.tolist() if isinstance(indices, torch.Tensor) else list(indices) # Create buffers translations = Vt.Vec3dArray(len(indices_list)) orientations = Vt.QuatdArray(len(indices_list)) # Create a fresh XformCache to avoid stale cached values xform_cache = UsdGeom.XformCache(Usd.TimeCode.Default()) for idx, prim_idx in enumerate(indices_list): prim = self._prims[prim_idx] prim_tf = xform_cache.GetLocalTransformation(prim)[0] prim_tf.Orthonormalize() translations[idx] = prim_tf.ExtractTranslation() orientations[idx] = prim_tf.ExtractRotationQuat() translations = torch.tensor(np.array(translations), dtype=torch.float32, device=self._device) orientations = torch.tensor(np.array(orientations), dtype=torch.float32, device=self._device) orientations = math_utils.convert_quat(orientations, to="wxyz") return translations, orientations # type: ignore def _get_scales_usd(self, indices: Sequence[int] | None = None) -> torch.Tensor: """Get scales from USD.""" # Resolve indices if indices is None or indices == slice(None): indices_list = self._ALL_INDICES else: indices_list = indices.tolist() if isinstance(indices, torch.Tensor) else list(indices) # Create buffers scales = Vt.Vec3dArray(len(indices_list)) for idx, prim_idx in enumerate(indices_list): prim = self._prims[prim_idx] scales[idx] = prim.GetAttribute("xformOp:scale").Get() # Convert to tensor return torch.tensor(np.array(scales), dtype=torch.float32, device=self._device) """ Internal Functions - Fabric. """ def _set_world_poses_fabric( self, positions: torch.Tensor | None = None, orientations: torch.Tensor | None = None, indices: Sequence[int] | None = None, ): """Set world poses using Fabric GPU batch operations. Writes directly to Fabric's ``omni:fabric:worldMatrix`` attribute using Warp kernels. Changes are propagated through Fabric's hierarchy system but remain GPU-resident. For workflows mixing Fabric world pose writes with USD local pose queries, note that local poses read from USD's xformOp:* attributes, which may not immediately reflect Fabric changes. For best performance and consistency, use Fabric methods exclusively (get_world_poses/set_world_poses with Fabric enabled). """ # Lazy initialization if not self._fabric_initialized: self._initialize_fabric() # Resolve indices (treat slice(None) as None for consistency with USD path) indices_wp = self._resolve_indices_wp(indices) count = indices_wp.shape[0] # Convert torch to warp (if provided), use dummy arrays for None to avoid Warp kernel issues if positions is not None: positions_wp = wp.from_torch(positions) else: positions_wp = wp.zeros((0, 3), dtype=wp.float32).to(self._device) if orientations is not None: orientations_wp = wp.from_torch(orientations) else: orientations_wp = wp.zeros((0, 4), dtype=wp.float32).to(self._device) # Dummy array for scales (not modifying) scales_wp = wp.zeros((0, 3), dtype=wp.float32).to(self._device) # Use cached fabricarray for world matrices world_matrices = self._fabric_world_matrices # Batch compose matrices with a single kernel launch wp.launch( kernel=fabric_utils.compose_fabric_transformation_matrix_from_warp_arrays, dim=count, inputs=[ world_matrices, positions_wp, orientations_wp, scales_wp, # dummy array instead of None False, # broadcast_positions False, # broadcast_orientations False, # broadcast_scales indices_wp, self._view_to_fabric, ], device=self._device, ) # Synchronize to ensure kernel completes wp.synchronize() # Update world transforms within Fabric hierarchy self._fabric_hierarchy.update_world_xforms() # Fabric now has authoritative data; skip future USD syncs self._fabric_usd_sync_done = True # Mirror to USD for renderer-facing prims when enabled. if self._sync_usd_on_fabric_write: self._set_world_poses_usd(positions, orientations, indices) # Fabric writes are GPU-resident; local pose operations still use USD. def _set_local_poses_fabric( self, translations: torch.Tensor | None = None, orientations: torch.Tensor | None = None, indices: Sequence[int] | None = None, ): """Set local poses using USD (matches Isaac Sim's design). Note: Even in Fabric mode, local pose operations use USD. This is Isaac Sim's design: the ``usd=False`` parameter only affects world poses. Rationale: - Local pose writes need correct parent-child hierarchy relationships - USD maintains these relationships correctly and efficiently - Fabric is optimized for world pose operations, not local hierarchies """ self._set_local_poses_usd(translations, orientations, indices) def _set_scales_fabric(self, scales: torch.Tensor, indices: Sequence[int] | None = None): """Set scales using Fabric GPU batch operations.""" # Lazy initialization if not self._fabric_initialized: self._initialize_fabric() # Resolve indices (treat slice(None) as None for consistency with USD path) indices_wp = self._resolve_indices_wp(indices) count = indices_wp.shape[0] # Convert torch to warp scales_wp = wp.from_torch(scales) # Dummy arrays for positions and orientations (not modifying) positions_wp = wp.zeros((0, 3), dtype=wp.float32).to(self._device) orientations_wp = wp.zeros((0, 4), dtype=wp.float32).to(self._device) # Use cached fabricarray for world matrices world_matrices = self._fabric_world_matrices # Batch compose matrices on GPU with a single kernel launch wp.launch( kernel=fabric_utils.compose_fabric_transformation_matrix_from_warp_arrays, dim=count, inputs=[ world_matrices, positions_wp, # dummy array instead of None orientations_wp, # dummy array instead of None scales_wp, False, # broadcast_positions False, # broadcast_orientations False, # broadcast_scales indices_wp, self._view_to_fabric, ], device=self._device, ) # Synchronize to ensure kernel completes before syncing wp.synchronize() # Update world transforms to propagate changes self._fabric_hierarchy.update_world_xforms() # Fabric now has authoritative data; skip future USD syncs self._fabric_usd_sync_done = True # Mirror to USD for renderer-facing prims when enabled. if self._sync_usd_on_fabric_write: self._set_scales_usd(scales, indices) def _get_world_poses_fabric(self, indices: Sequence[int] | None = None) -> tuple[torch.Tensor, torch.Tensor]: """Get world poses from Fabric using GPU batch operations.""" # Lazy initialization of Fabric infrastructure if not self._fabric_initialized: self._initialize_fabric() # Sync once from USD to ensure reads see the latest authored transforms if not self._fabric_usd_sync_done: self._sync_fabric_from_usd_once() # Resolve indices (treat slice(None) as None for consistency with USD path) indices_wp = self._resolve_indices_wp(indices) count = indices_wp.shape[0] # Use pre-allocated buffers for full reads, allocate only for partial reads use_cached_buffers = indices is None or indices == slice(None) if use_cached_buffers: # Full read: Use cached buffers (zero allocation overhead!) positions_wp = self._fabric_positions_buffer orientations_wp = self._fabric_orientations_buffer scales_wp = self._fabric_dummy_buffer else: # Partial read: Need to allocate buffers of appropriate size positions_wp = wp.zeros((count, 3), dtype=wp.float32).to(self._device) orientations_wp = wp.zeros((count, 4), dtype=wp.float32).to(self._device) scales_wp = self._fabric_dummy_buffer # Always use dummy for scales # Use cached fabricarray for world matrices # This eliminates the 0.06-0.30ms variability from creating fabricarray each call world_matrices = self._fabric_world_matrices # Launch GPU kernel to decompose matrices in parallel wp.launch( kernel=fabric_utils.decompose_fabric_transformation_matrix_to_warp_arrays, dim=count, inputs=[ world_matrices, positions_wp, orientations_wp, scales_wp, # dummy array instead of None indices_wp, self._view_to_fabric, ], device=self._device, ) # Return tensors: zero-copy for cached buffers, conversion for partial reads if use_cached_buffers: # Zero-copy! The Warp kernel wrote directly into the PyTorch tensors # We just need to synchronize to ensure the kernel is done wp.synchronize() return self._fabric_positions_torch, self._fabric_orientations_torch else: # Partial read: Need to convert from Warp to torch positions = wp.to_torch(positions_wp) orientations = wp.to_torch(orientations_wp) return positions, orientations def _get_local_poses_fabric(self, indices: Sequence[int] | None = None) -> tuple[torch.Tensor, torch.Tensor]: """Get local poses using USD (matches Isaac Sim's design). Note: Even in Fabric mode, local pose operations use USD's XformCache. This is Isaac Sim's design: the ``usd=False`` parameter only affects world poses. Rationale: - Local pose computation requires parent transforms which may not be in the view - USD's XformCache provides efficient hierarchy-aware local transform queries - Fabric is optimized for world pose operations, not local hierarchies """ return self._get_local_poses_usd(indices) def _get_scales_fabric(self, indices: Sequence[int] | None = None) -> torch.Tensor: """Get scales from Fabric using GPU batch operations.""" # Lazy initialization if not self._fabric_initialized: self._initialize_fabric() # Sync once from USD to ensure reads see the latest authored transforms if not self._fabric_usd_sync_done: self._sync_fabric_from_usd_once() # Resolve indices (treat slice(None) as None for consistency with USD path) indices_wp = self._resolve_indices_wp(indices) count = indices_wp.shape[0] # Use pre-allocated buffers for full reads, allocate only for partial reads use_cached_buffers = indices is None or indices == slice(None) if use_cached_buffers: # Full read: Use cached buffers (zero allocation overhead!) scales_wp = self._fabric_scales_buffer else: # Partial read: Need to allocate buffer of appropriate size scales_wp = wp.zeros((count, 3), dtype=wp.float32).to(self._device) # Always use dummy buffers for positions and orientations (not needed for scales) positions_wp = self._fabric_dummy_buffer orientations_wp = self._fabric_dummy_buffer # Use cached fabricarray for world matrices world_matrices = self._fabric_world_matrices # Launch GPU kernel to decompose matrices in parallel wp.launch( kernel=fabric_utils.decompose_fabric_transformation_matrix_to_warp_arrays, dim=count, inputs=[ world_matrices, positions_wp, # dummy array instead of None orientations_wp, # dummy array instead of None scales_wp, indices_wp, self._view_to_fabric, ], device=self._device, ) # Return tensor: zero-copy for cached buffers, conversion for partial reads if use_cached_buffers: # Zero-copy! The Warp kernel wrote directly into the PyTorch tensor wp.synchronize() return self._fabric_scales_torch else: # Partial read: Need to convert from Warp to torch return wp.to_torch(scales_wp) """ Internal Functions - Initialization. """ def _initialize_fabric(self) -> None: """Initialize Fabric batch infrastructure for GPU-accelerated pose queries. This method ensures all prims have the required Fabric hierarchy attributes (``omni:fabric:localMatrix`` and ``omni:fabric:worldMatrix``) and creates the necessary infrastructure for batch GPU operations using Warp. Based on the Fabric Hierarchy documentation, when Fabric Scene Delegate is enabled, all boundable prims should have these attributes. This method ensures they exist and are properly synchronized with USD. """ import usdrt from usdrt import Rt # Get USDRT (Fabric) stage stage_id = sim_utils.get_current_stage_id() fabric_stage = usdrt.Usd.Stage.Attach(stage_id) # Step 1: Ensure all prims have Fabric hierarchy attributes # According to the documentation, these attributes are created automatically # when Fabric Scene Delegate is enabled, but we ensure they exist for i in range(self.count): rt_prim = fabric_stage.GetPrimAtPath(self.prim_paths[i]) rt_xformable = Rt.Xformable(rt_prim) # Create Fabric hierarchy world matrix attribute if it doesn't exist has_attr = ( rt_xformable.HasFabricHierarchyWorldMatrixAttr() if hasattr(rt_xformable, "HasFabricHierarchyWorldMatrixAttr") else False ) if not has_attr: rt_xformable.CreateFabricHierarchyWorldMatrixAttr() # Best-effort USD->Fabric sync; authoritative initialization happens on first read. rt_xformable.SetWorldXformFromUsd() # Create view index attribute for batch operations rt_prim.CreateAttribute(self._view_index_attr, usdrt.Sdf.ValueTypeNames.UInt, custom=True) rt_prim.GetAttribute(self._view_index_attr).Set(i) # After syncing all prims, update the Fabric hierarchy to ensure world matrices are computed self._fabric_hierarchy = usdrt.hierarchy.IFabricHierarchy().get_fabric_hierarchy( fabric_stage.GetFabricId(), fabric_stage.GetStageIdAsStageId() ) self._fabric_hierarchy.update_world_xforms() # Step 2: Create index arrays for batch operations self._default_view_indices = wp.zeros((self.count,), dtype=wp.uint32).to(self._device) wp.launch( kernel=fabric_utils.arange_k, dim=self.count, inputs=[self._default_view_indices], device=self._device, ) wp.synchronize() # Ensure indices are ready # Step 3: Create Fabric selection with attribute filtering # SelectPrims expects device format like "cuda:0" not "cuda" # # KNOWN ISSUE: SelectPrims may return prims in a different order than self._prims # (which comes from USD's find_matching_prims). We create a bidirectional mapping # (_view_to_fabric and _fabric_to_view) to handle this ordering difference. # This works correctly for full-view operations but partial indexing still has issues. fabric_device = self._device if self._device == "cuda": logger.warning("Fabric device is not specified, defaulting to 'cuda:0'.") fabric_device = "cuda:0" self._fabric_selection = fabric_stage.SelectPrims( require_attrs=[ (usdrt.Sdf.ValueTypeNames.UInt, self._view_index_attr, usdrt.Usd.Access.Read), (usdrt.Sdf.ValueTypeNames.Matrix4d, "omni:fabric:worldMatrix", usdrt.Usd.Access.ReadWrite), ], device=fabric_device, ) # Step 4: Create bidirectional mapping between view and fabric indices self._view_to_fabric = wp.zeros((self.count,), dtype=wp.uint32).to(self._device) self._fabric_to_view = wp.fabricarray(self._fabric_selection, self._view_index_attr) wp.launch( kernel=fabric_utils.set_view_to_fabric_array, dim=self._fabric_to_view.shape[0], inputs=[self._fabric_to_view, self._view_to_fabric], device=self._device, ) # Synchronize to ensure mapping is ready before any operations wp.synchronize() # Pre-allocate reusable output buffers for read operations self._fabric_positions_torch = torch.zeros((self.count, 3), dtype=torch.float32, device=self._device) self._fabric_orientations_torch = torch.zeros((self.count, 4), dtype=torch.float32, device=self._device) self._fabric_scales_torch = torch.zeros((self.count, 3), dtype=torch.float32, device=self._device) # Create Warp views of the PyTorch tensors self._fabric_positions_buffer = wp.from_torch(self._fabric_positions_torch, dtype=wp.float32) self._fabric_orientations_buffer = wp.from_torch(self._fabric_orientations_torch, dtype=wp.float32) self._fabric_scales_buffer = wp.from_torch(self._fabric_scales_torch, dtype=wp.float32) # Dummy array for unused outputs (always empty) self._fabric_dummy_buffer = wp.zeros((0, 3), dtype=wp.float32).to(self._device) # Cache fabricarray for world matrices to avoid recreation overhead # Refs: https://docs.omniverse.nvidia.com/kit/docs/usdrt/latest/docs/usdrt_prim_selection.html # https://docs.omniverse.nvidia.com/kit/docs/usdrt/latest/docs/scenegraph_use.html self._fabric_world_matrices = wp.fabricarray(self._fabric_selection, "omni:fabric:worldMatrix") # Cache Fabric stage to avoid expensive get_current_stage() calls self._fabric_stage = fabric_stage self._fabric_initialized = True # Force a one-time USD->Fabric sync on first read to pick up any USD edits # made after the view was constructed. self._fabric_usd_sync_done = False def _sync_fabric_from_usd_once(self) -> None: """Sync Fabric world matrices from USD once, on the first read.""" # Ensure Fabric is initialized if not self._fabric_initialized: self._initialize_fabric() # Ensure authored USD transforms are flushed before reading into Fabric. sim_utils.update_stage() # Read authoritative transforms from USD and write once into Fabric. positions_usd, orientations_usd = self._get_world_poses_usd() scales_usd = self._get_scales_usd() prev_sync = self._sync_usd_on_fabric_write self._sync_usd_on_fabric_write = False self._set_world_poses_fabric(positions_usd, orientations_usd) self._set_scales_fabric(scales_usd) self._sync_usd_on_fabric_write = prev_sync self._fabric_usd_sync_done = True def _resolve_indices_wp(self, indices: Sequence[int] | None) -> wp.array: """Resolve view indices as a Warp array.""" if indices is None or indices == slice(None): if self._default_view_indices is None: raise RuntimeError("Fabric indices are not initialized.") return self._default_view_indices indices_list = indices.tolist() if isinstance(indices, torch.Tensor) else list(indices) return wp.array(indices_list, dtype=wp.uint32).to(self._device)