Source code for isaaclab.sim.utils.stage

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Utilities for operating on the USD stage."""

from __future__ import annotations

import contextlib
import logging
import os
import threading
from collections.abc import Callable, Generator

from pxr import Sdf, Usd, UsdUtils

from isaaclab.utils.version import get_isaac_sim_version, has_kit

# import logger
logger = logging.getLogger(__name__)
_context = threading.local()  # thread-local storage to handle nested contexts and concurrent access

# Kit-dependent imports (only available when running with Kit/Isaac Sim)
if has_kit():
    import omni.kit.app


def _check_ancestral(prim: Usd.Prim) -> bool:
    """Check if a prim is brought into composition by its ancestor (an ancestral prim).

    This is a pure USD implementation of ``omni.usd.check_ancestral``.

    An ancestral prim is one that exists due to a reference, payload, or other composition arc
    on an ancestor prim. Such prims cannot be directly deleted because they are "opinions" from
    the referenced asset, not locally authored prims.

    Args:
        prim: The USD prim to check.

    Returns:
        True if the prim is an ancestral prim, False otherwise.
    """
    if not prim or not prim.IsValid():
        return False

    def _check_ancestral_node(node) -> bool:
        """Recursively check if any composition node is due to an ancestor."""
        if node.IsDueToAncestor():
            return True
        return any(_check_ancestral_node(child) for child in node.children)

    prim_index = prim.GetPrimIndex()
    if not prim_index:
        return False

    return _check_ancestral_node(prim_index.rootNode)


[docs] def resolve_paths( src_layer_identifier: str, dst_layer_identifier: str, store_relative_path: bool = True, ) -> None: """Resolve external asset paths in a destination layer relative to a source layer. When content is copied from one USD layer to another (e.g., via ``Sdf.CopySpec`` or ``layer.TransferContent``), relative asset paths that were valid from the source layer's location may become invalid from the destination layer's location. This function recalculates those paths. This uses USD's built-in ``UsdUtils.ModifyAssetPaths`` to update all external references (sublayers, references, payloads, asset paths) in the destination layer. Args: src_layer_identifier: The identifier (path) of the source layer. dst_layer_identifier: The identifier (path) of the destination layer. store_relative_path: Whether to store paths as relative. Defaults to True. Example: >>> from pxr import Sdf >>> import isaaclab.sim as sim_utils >>> >>> # After copying content to a new layer >>> source_layer = stage.GetRootLayer() >>> target_layer = Sdf.Layer.CreateNew("/path/to/output.usd") >>> target_layer.TransferContent(source_layer) >>> sim_utils.resolve_paths(source_layer.identifier, target_layer.identifier) >>> target_layer.Save() """ src_layer = Sdf.Layer.FindOrOpen(src_layer_identifier) dst_layer = Sdf.Layer.FindOrOpen(dst_layer_identifier) if not src_layer: logger.warning(f"Source layer not found: {src_layer_identifier}") return if not dst_layer: logger.warning(f"Destination layer not found: {dst_layer_identifier}") return dst_dir = os.path.dirname(dst_layer.realPath or dst_layer.identifier) def _modify_path(asset_path: str) -> str: if not asset_path: return asset_path resolved = src_layer.ComputeAbsolutePath(asset_path) if store_relative_path and resolved and dst_dir: try: return os.path.relpath(resolved, dst_dir) except ValueError: return resolved return resolved or asset_path UsdUtils.ModifyAssetPaths(dst_layer, _modify_path)
# ############################################################################## # Public API # ############################################################################## try: # _context is a singleton design in isaacsim and for that reason # until we fully replace all modules that references the singleton(such as XformPrim, Prim ....), we have to point # that singleton to this _context from isaacsim.core.utils import stage as sim_stage sim_stage._context = _context # type: ignore except ImportError: pass
[docs] def create_new_stage() -> Usd.Stage: """Create a new in-memory USD stage. Creates a new stage using pure USD (``Usd.Stage.CreateInMemory()``). If Kit is running and Kit extensions need to discover this stage (e.g. PhysX, ``isaacsim.core.prims.Articulation``), call :func:`attach_stage_to_usd_context` after scene setup. Returns: Usd.Stage: The created USD stage. Example: >>> import isaaclab.sim as sim_utils >>> >>> sim_utils.create_new_stage() Usd.Stage.Open(rootLayer=Sdf.Find('anon:0x7fba6c04f840:World7.usd'), sessionLayer=Sdf.Find('anon:0x7fba6c01c5c0:World7-session.usda'), pathResolverContext=<invalid repr>) """ stage: Usd.Stage = Usd.Stage.CreateInMemory() _context.stage = stage UsdUtils.StageCache.Get().Insert(stage) return stage
[docs] def is_current_stage_in_memory() -> bool: """Checks if the current stage is NOT attached to the USD context. This function compares the current stage (from :func:`get_current_stage`) with the stage attached to Kit's ``omni.usd`` context. If they are different, the current stage is considered "in memory" - meaning it's not the stage that the viewport/UI displays. This is useful for determining if we're working with a separate in-memory stage created via :func:`create_new_stage_in_memory` with ``SimulationCfg(create_stage_in_memory=True)``. In kitless mode (no USD context), this always returns True. Returns: True if the current stage is different from (not attached to) the context stage. Also returns True if there is no context stage at all. """ if not has_kit(): return True import omni.usd context = omni.usd.get_context() if context is None: return True context_stage = context.get_stage() if context_stage is None: return True return get_current_stage() is not context_stage
[docs] def open_stage(usd_path: str) -> Usd.Stage: """Open the given USD file. Opens a USD file using pure USD (``Usd.Stage.Open()``). If Kit is available and context attachment is needed for viewport/UI display, use :func:`attach_stage_to_usd_context` after opening the stage. Args: usd_path: The path to the USD file to open. Returns: The opened USD stage. Raises: ValueError: When input path is not a supported file type by USD. RuntimeError: When failed to open the stage. """ if not Usd.Stage.IsSupportedFile(usd_path): raise ValueError(f"The USD file at path '{usd_path}' is not supported.") stage = Usd.Stage.Open(usd_path) if stage is None: raise RuntimeError(f"Failed to open USD stage at path '{usd_path}'.") # Set as current stage so get_current_stage() can find it _context.stage = stage return stage
[docs] @contextlib.contextmanager def use_stage(stage: Usd.Stage) -> Generator[None, None, None]: """Context manager that sets a thread-local stage, if supported. This function binds the stage to the thread-local context for the duration of the context manager. During the context manager, any call to :func:`get_current_stage` will return the stage specified in the context manager. After the context manager is exited, the stage is restored to the default stage attached to the USD context. .. versionadded:: 2.3.0 This function is available in Isaac Sim 5.0 and later. For backwards compatibility, it falls back to a no-op context manager in Isaac Sim < 5.0. Args: stage: The stage to set in the context. Returns: A context manager that sets the stage in the context. Raises: AssertionError: If the stage is not a USD stage instance. Example: >>> from pxr import Usd >>> import isaaclab.sim as sim_utils >>> >>> stage_in_memory = Usd.Stage.CreateInMemory() >>> with sim_utils.use_stage(stage_in_memory): ... # operate on the specified stage ... pass >>> # operate on the default stage attached to the USD context """ if has_kit() and get_isaac_sim_version().major < 5: logger.warning("Isaac Sim < 5.0 does not support thread-local stage contexts. Skipping use_stage().") yield # no-op else: # check stage if not isinstance(stage, Usd.Stage): raise TypeError(f"Expected a USD stage instance, got: {type(stage)}") # store previous context value if it exists previous_stage = getattr(_context, "stage", None) # set new context value try: _context.stage = stage yield # remove context value or restore previous one if it exists finally: if previous_stage is None: delattr(_context, "stage") else: _context.stage = previous_stage
[docs] def update_stage() -> None: """Triggers a full application update cycle to process USD stage changes. This function calls ``omni.kit.app.get_app_interface().update()`` which triggers a complete application update including: * Physics simulation step (if ``/app/player/playSimulations`` is True) * Rendering (RTX path tracing, viewport updates) * UI updates (widgets, windows) * Timeline events and callbacks * Extension updates * USD/Fabric synchronization When to Use: * **After creating a new stage**: ``create_new_stage()`` → ``update_stage()`` * **After spawning prims**: ``cfg.func("/World/Robot", cfg)`` → ``update_stage()`` * **After USD authoring**: Creating materials, lights, meshes, etc. * **Before simulation starts**: During setup phase, before ``sim.reset()`` * **In test fixtures**: To ensure consistent state before each test When NOT to Use: * **During active simulation** (after ``sim.play()``): Can interfere with physics stepping and cause double-stepping or timing issues. * **During sensor updates**: Can reset RTX renderer state mid-cycle, causing incorrect sensor outputs (e.g., ``inf`` depth values). * **Inside physics/render callbacks**: Can cause recursion or timing issues. * **Inside ``sim.step()`` or ``sim.render()``**: These already perform app updates internally with proper safeguards. For rendering during simulation without physics stepping, use:: sim.set_setting("/app/player/playSimulations", False) omni.kit.app.get_app().update() sim.set_setting("/app/player/playSimulations", True) Example: >>> import isaaclab.sim as sim_utils >>> >>> # Setup phase - safe to use >>> sim_utils.create_new_stage() >>> robot_cfg.func("/World/Robot", robot_cfg) >>> sim_utils.update_stage() # Commit USD changes >>> >>> # Simulation phase - DO NOT use update_stage() >>> sim.reset() >>> sim.play() >>> for _ in range(100): ... sim.step() # Handles updates internally """ omni.kit.app.get_app_interface().update()
[docs] def save_stage(usd_path: str, save_and_reload_in_place: bool = True) -> bool: """Saves contents of the root layer of the current stage to the specified USD file. If the file already exists, it will be overwritten. Args: usd_path: The file path to save the current stage to save_and_reload_in_place: Whether to open the saved USD file in place. Defaults to True. Returns: True if operation is successful, otherwise False. Raises: ValueError: When input path is not a supported file type by USD. RuntimeError: When layer creation or save operation fails. """ # check if USD file is supported if not Usd.Stage.IsSupportedFile(usd_path): raise ValueError(f"The USD file at path '{usd_path}' is not supported.") # create new layer layer = Sdf.Layer.CreateNew(usd_path) if layer is None: raise RuntimeError(f"Failed to create new USD layer at path '{usd_path}'.") # get root layer root_layer = get_current_stage().GetRootLayer() # transfer content from root layer to new layer layer.TransferContent(root_layer) # resolve paths so asset references remain valid from the new location resolve_paths(root_layer.identifier, layer.identifier) # save layer result = layer.Save() if not result: logger.error(f"Failed to save USD layer to path '{usd_path}'.") # if requested, open the saved USD file in place if save_and_reload_in_place and result: open_stage(usd_path) return result
[docs] def close_stage() -> bool: """Closes the current USD stage. If Kit is running, this first closes the stage via the Kit USD context (``omni.usd.get_context().close_stage()``), then clears the stage cache. Without Kit, only the stage cache is cleared. .. note:: Once the stage is closed, it is necessary to open a new stage or create a new one in order to work on it. Returns: True if operation is successful. Example: >>> import isaaclab.sim as sim_utils >>> >>> sim_utils.close_stage() True """ # Close Kit's USD context first (while the stage is still in the cache), # then clear the cache. Reversing this order causes Kit to fail with # "Removal of UsdStage from cache failed" and can hang during teardown. if has_kit(): import omni.usd omni.usd.get_context().close_stage() stage_cache = UsdUtils.StageCache.Get() stage_cache.Clear() _context.stage = None return True
def _is_prim_deletable(prim: Usd.Prim) -> bool: """Check if a prim can be safely deleted. This function checks various conditions to determine if a prim should be deleted: - Root prim ("/") cannot be deleted - Prims under "/Render" namespace are preserved - Prims with "no_delete" metadata are preserved - Prims hidden in stage window are preserved - Ancestral prims (from USD references) cannot be deleted Args: prim: The USD prim to check. Returns: True if the prim can be deleted, False otherwise. """ prim_path = prim.GetPath().pathString if prim_path == "/": return False if prim_path.startswith("/Render"): return False if prim.GetMetadata("no_delete"): return False if prim.GetMetadata("hide_in_stage_window"): return False # Check ancestral prims (from USD references) using pure USD helper if _check_ancestral(prim): return False return True
[docs] def clear_stage(predicate: Callable[[Usd.Prim], bool] | None = None) -> None: """Deletes all prims in the stage without populating the undo command buffer. The function will delete all prims in the stage that satisfy the predicate. If the predicate is None, a default predicate will be used that deletes all prims. The default predicate deletes all prims that are not the root prim, are not under the /Render namespace, have the ``no_delete`` metadata, are not ancestral to any other prim, and are not hidden in the stage window. Args: predicate: A user defined function that takes the USD prim as an argument and returns a boolean indicating if the prim should be deleted. If the predicate is None, a default predicate will be used that deletes all prims. Example: >>> import isaaclab.sim as sim_utils >>> >>> # clear the whole stage >>> sim_utils.clear_stage() >>> >>> # given the stage: /World/Cube, /World/Cube_01, /World/Cube_02. >>> # Delete only the prims of type Cube >>> predicate = lambda _prim: _prim.GetTypeName() == "Cube" >>> sim_utils.clear_stage(predicate) # after the execution the stage will be /World """ # Note: Need to import this here to prevent circular dependencies. from .prims import delete_prim from .queries import get_all_matching_child_prims def _predicate_from_path(prim: Usd.Prim) -> bool: if predicate is None: return _is_prim_deletable(prim) # Custom predicate must also pass the deletable check return predicate(prim) and _is_prim_deletable(prim) # get all prims to delete prims = get_all_matching_child_prims("/", _predicate_from_path) # convert prims to prim paths prim_paths_to_delete = [prim.GetPath().pathString for prim in prims] # delete prims delete_prim(prim_paths_to_delete) if has_kit(): omni.kit.app.get_app_interface().update()
[docs] def get_current_stage(fabric: bool = False) -> Usd.Stage: """Get the current open USD or Fabric stage Args: fabric: True to get the fabric stage. False to get the USD stage. Defaults to False. Returns: The USD or Fabric stage as specified by the input arg fabric. Example: >>> import isaaclab.sim as sim_utils >>> >>> sim_utils.get_current_stage() Usd.Stage.Open(rootLayer=Sdf.Find('anon:0x7fba6c04f840:World7.usd'), sessionLayer=Sdf.Find('anon:0x7fba6c01c5c0:World7-session.usda'), pathResolverContext=<invalid repr>) """ # First check thread-local context for an in-memory stage stage = getattr(_context, "stage", None) if stage is not None: if fabric: import usdrt # Get stage ID and attach to Fabric stage stage_id = get_current_stage_id() return usdrt.Usd.Stage.Attach(stage_id) return stage return stage
[docs] def get_current_stage_id() -> int: """Get the current open stage ID. Returns: The current open stage id. Example: >>> import isaaclab.sim as sim_utils >>> >>> sim_utils.get_current_stage_id() 1234567890 """ # get current stage stage = get_current_stage() if stage is None: raise RuntimeError("No current stage available. Did you create a stage?") # retrieve stage ID from stage cache stage_cache = UsdUtils.StageCache.Get() stage_id = stage_cache.GetId(stage).ToLongInt() # if stage ID is not found, insert it into the stage cache if stage_id < 0: # Ensure stage has a valid root layer before inserting if not stage.GetRootLayer(): raise RuntimeError("Stage has no root layer - cannot cache an incomplete stage.") stage_id = stage_cache.Insert(stage).ToLongInt() # return stage ID return stage_id