# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Utilities for operating on the USD stage."""
import builtins
import contextlib
import logging
import threading
from collections.abc import Callable, Generator
import omni.kit.app
import omni.usd
from isaacsim.core.utils import stage as sim_stage
from pxr import Sdf, Usd, UsdUtils
from isaaclab.utils.version import get_isaac_sim_version
# import logger
logger = logging.getLogger(__name__)
_context = threading.local() # thread-local storage to handle nested contexts and concurrent access
# _context is a singleton design in isaacsim and for that reason
# until we fully replace all modules that references the singleton(such as XformPrim, Prim ....), we have to point
# that singleton to this _context
sim_stage._context = _context # type: ignore
[docs]def create_new_stage() -> Usd.Stage:
"""Create a new stage attached to the USD context.
Returns:
Usd.Stage: The created USD stage.
Raises:
RuntimeError: When failed to create a new stage.
Example:
>>> import isaaclab.sim as sim_utils
>>>
>>> sim_utils.create_new_stage()
Usd.Stage.Open(rootLayer=Sdf.Find('anon:0x7fba6c04f840:World7.usd'),
sessionLayer=Sdf.Find('anon:0x7fba6c01c5c0:World7-session.usda'),
pathResolverContext=<invalid repr>)
"""
result = omni.usd.get_context().new_stage()
if result:
return omni.usd.get_context().get_stage()
else:
raise RuntimeError("Failed to create a new stage. Please check if the USD context is valid.")
[docs]def create_new_stage_in_memory() -> Usd.Stage:
"""Creates a new stage in memory, if supported.
.. versionadded:: 2.3.0
This function is available in Isaac Sim 5.0 and later. For backwards
compatibility, it falls back to creating a new stage attached to the USD context.
Returns:
The new stage in memory.
Example:
>>> import isaaclab.sim as sim_utils
>>>
>>> sim_utils.create_new_stage_in_memory()
Usd.Stage.Open(rootLayer=Sdf.Find('anon:0xf7b00e0:tmp.usda'),
sessionLayer=Sdf.Find('anon:0xf7cd2e0:tmp-session.usda'),
pathResolverContext=<invalid repr>)
"""
if get_isaac_sim_version().major < 5:
logger.warning(
"Isaac Sim < 5.0 does not support creating a new stage in memory. Falling back to creating a new"
" stage attached to USD context."
)
return create_new_stage()
else:
return Usd.Stage.CreateInMemory()
[docs]def is_current_stage_in_memory() -> bool:
"""Checks if the current stage is in memory.
This function compares the stage id of the current USD stage with the stage id of the USD context stage.
Returns:
Whether the current stage is in memory.
"""
# grab current stage id
stage_id = get_current_stage_id()
# grab context stage id
context_stage = omni.usd.get_context().get_stage()
with use_stage(context_stage):
context_stage_id = get_current_stage_id()
# check if stage ids are the same
return stage_id != context_stage_id
[docs]def open_stage(usd_path: str) -> bool:
"""Open the given usd file and replace currently opened stage.
Args:
usd_path: The path to the USD file to open.
Returns:
True if operation is successful, otherwise False.
Raises:
ValueError: When input path is not a supported file type by USD.
"""
# check if USD file is supported
if not Usd.Stage.IsSupportedFile(usd_path):
raise ValueError(f"The USD file at path '{usd_path}' is not supported.")
# get USD context
usd_context = omni.usd.get_context()
# disable save to recent files
usd_context.disable_save_to_recent_files()
# open stage
result = usd_context.open_stage(usd_path)
# enable save to recent files
usd_context.enable_save_to_recent_files()
# return result
return result
[docs]@contextlib.contextmanager
def use_stage(stage: Usd.Stage) -> Generator[None, None, None]:
"""Context manager that sets a thread-local stage, if supported.
This function binds the stage to the thread-local context for the duration of the context manager.
During the context manager, any call to :func:`get_current_stage` will return the stage specified
in the context manager. After the context manager is exited, the stage is restored to the default
stage attached to the USD context.
.. versionadded:: 2.3.0
This function is available in Isaac Sim 5.0 and later. For backwards
compatibility, it falls back to a no-op context manager in Isaac Sim < 5.0.
Args:
stage: The stage to set in the context.
Returns:
A context manager that sets the stage in the context.
Raises:
AssertionError: If the stage is not a USD stage instance.
Example:
>>> from pxr import Usd
>>> import isaaclab.sim as sim_utils
>>>
>>> stage_in_memory = Usd.Stage.CreateInMemory()
>>> with sim_utils.use_stage(stage_in_memory):
... # operate on the specified stage
... pass
>>> # operate on the default stage attached to the USD context
"""
if get_isaac_sim_version().major < 5:
logger.warning("Isaac Sim < 5.0 does not support thread-local stage contexts. Skipping use_stage().")
yield # no-op
else:
# check stage
if not isinstance(stage, Usd.Stage):
raise TypeError(f"Expected a USD stage instance, got: {type(stage)}")
# store previous context value if it exists
previous_stage = getattr(_context, "stage", None)
# set new context value
try:
_context.stage = stage
yield
# remove context value or restore previous one if it exists
finally:
if previous_stage is None:
delattr(_context, "stage")
else:
_context.stage = previous_stage
[docs]def update_stage() -> None:
"""Updates the current stage by triggering an application update cycle.
This function triggers a single update cycle of the application interface, which
in turn updates the stage and all associated systems (rendering, physics, etc.).
This is necessary to ensure that changes made to the stage are properly processed
and reflected in the simulation.
Note:
This function calls the application update interface rather than directly
updating the stage because the stage update is part of the broader
application update cycle that includes rendering, physics, and other systems.
Example:
>>> import isaaclab.sim as sim_utils
>>>
>>> sim_utils.update_stage()
"""
# TODO: Why is this updating the simulation and not the stage?
omni.kit.app.get_app_interface().update()
[docs]def save_stage(usd_path: str, save_and_reload_in_place: bool = True) -> bool:
"""Saves contents of the root layer of the current stage to the specified USD file.
If the file already exists, it will be overwritten.
Args:
usd_path: The file path to save the current stage to
save_and_reload_in_place: Whether to open the saved USD file in place. Defaults to True.
Returns:
True if operation is successful, otherwise False.
Raises:
ValueError: When input path is not a supported file type by USD.
RuntimeError: When layer creation or save operation fails.
"""
# check if USD file is supported
if not Usd.Stage.IsSupportedFile(usd_path):
raise ValueError(f"The USD file at path '{usd_path}' is not supported.")
# create new layer
layer = Sdf.Layer.CreateNew(usd_path)
if layer is None:
raise RuntimeError(f"Failed to create new USD layer at path '{usd_path}'.")
# get root layer
root_layer = get_current_stage().GetRootLayer()
# transfer content from root layer to new layer
layer.TransferContent(root_layer)
# resolve paths
omni.usd.resolve_paths(root_layer.identifier, layer.identifier)
# save layer
result = layer.Save()
if not result:
logger.error(f"Failed to save USD layer to path '{usd_path}'.")
# if requested, open the saved USD file in place
if save_and_reload_in_place and result:
open_stage(usd_path)
return result
[docs]def close_stage(callback_fn: Callable[[bool, str], None] | None = None) -> bool:
"""Closes the current USD stage.
.. note::
Once the stage is closed, it is necessary to open a new stage or create a
new one in order to work on it.
Args:
callback_fn: A callback function to call while closing the stage.
The function should take two arguments: a boolean indicating whether the stage is closing
and a string indicating the error message if the stage closing fails. Defaults to None,
in which case the stage will be closed without a callback.
Returns:
True if operation is successful, otherwise False.
Example:
>>> import isaaclab.sim as sim_utils
>>>
>>> sim_utils.close_stage()
True
>>>
Example with callback function:
>>> import isaaclab.sim as sim_utils
>>>
>>> def callback(*args, **kwargs):
... print("callback:", args, kwargs)
...
>>> sim_utils.close_stage(callback)
True
>>> sim_utils.close_stage(callback)
callback: (False, 'Stage opening or closing already in progress!!') {}
False
"""
if callback_fn is None:
result = omni.usd.get_context().close_stage()
else:
result = omni.usd.get_context().close_stage_with_callback(callback_fn)
return result
[docs]def clear_stage(predicate: Callable[[Usd.Prim], bool] | None = None) -> None:
"""Deletes all prims in the stage without populating the undo command buffer.
The function will delete all prims in the stage that satisfy the predicate. If the predicate
is None, a default predicate will be used that deletes all prims. The default predicate deletes
all prims that are not the root prim, are not under the /Render namespace, have the ``no_delete``
metadata, are not ancestral to any other prim, and are not hidden in the stage window.
Args:
predicate: A user defined function that takes the USD prim as an argument and
returns a boolean indicating if the prim should be deleted. If the predicate is None,
a default predicate will be used that deletes all prims.
Example:
>>> import isaaclab.sim as sim_utils
>>>
>>> # clear the whole stage
>>> sim_utils.clear_stage()
>>>
>>> # given the stage: /World/Cube, /World/Cube_01, /World/Cube_02.
>>> # Delete only the prims of type Cube
>>> predicate = lambda _prim: _prim.GetTypeName() == "Cube"
>>> sim_utils.clear_stage(predicate) # after the execution the stage will be /World
"""
# Note: Need to import this here to prevent circular dependencies.
from .prims import delete_prim
from .queries import get_all_matching_child_prims
def _default_predicate(prim: Usd.Prim) -> bool:
"""Check if the prim should be deleted."""
prim_path = prim.GetPath().pathString
if prim_path == "/":
return False
if prim_path.startswith("/Render"):
return False
if prim.GetMetadata("no_delete"):
return False
if prim.GetMetadata("hide_in_stage_window"):
return False
if omni.usd.check_ancestral(prim):
return False
return True
def _predicate_from_path(prim: Usd.Prim) -> bool:
if predicate is None:
return _default_predicate(prim)
return predicate(prim)
# get all prims to delete
if predicate is None:
prims = get_all_matching_child_prims("/", _default_predicate)
else:
prims = get_all_matching_child_prims("/", _predicate_from_path)
# convert prims to prim paths
prim_paths_to_delete = [prim.GetPath().pathString for prim in prims]
# delete prims
delete_prim(prim_paths_to_delete)
if builtins.ISAAC_LAUNCHED_FROM_TERMINAL is False: # type: ignore
omni.kit.app.get_app_interface().update()
[docs]def is_stage_loading() -> bool:
"""Convenience function to see if any files are being loaded.
Returns:
bool: True if loading, False otherwise
Example:
>>> import isaaclab.sim as sim_utils
>>>
>>> sim_utils.is_stage_loading()
False
"""
context = omni.usd.get_context()
if context is None:
return False
else:
_, _, loading = context.get_stage_loading_status()
return loading > 0
[docs]def get_current_stage(fabric: bool = False) -> Usd.Stage:
"""Get the current open USD or Fabric stage
Args:
fabric: True to get the fabric stage. False to get the USD stage. Defaults to False.
Returns:
The USD or Fabric stage as specified by the input arg fabric.
Example:
>>> import isaaclab.sim as sim_utils
>>>
>>> sim_utils.get_current_stage()
Usd.Stage.Open(rootLayer=Sdf.Find('anon:0x7fba6c04f840:World7.usd'),
sessionLayer=Sdf.Find('anon:0x7fba6c01c5c0:World7-session.usda'),
pathResolverContext=<invalid repr>)
"""
stage = getattr(_context, "stage", omni.usd.get_context().get_stage())
return stage
[docs]def get_current_stage_id() -> int:
"""Get the current open stage ID.
Returns:
The current open stage id.
Example:
>>> import isaaclab.sim as sim_utils
>>>
>>> sim_utils.get_current_stage_id()
1234567890
"""
# get current stage
stage = get_current_stage()
# retrieve stage ID from stage cache
stage_cache = UsdUtils.StageCache.Get()
stage_id = stage_cache.GetId(stage).ToLongInt()
# if stage ID is not found, insert it into the stage cache
if stage_id < 0:
stage_id = stage_cache.Insert(stage).ToLongInt()
# return stage ID
return stage_id
[docs]def attach_stage_to_usd_context(attaching_early: bool = False):
"""Attaches the current USD stage in memory to the USD context.
This function should be called during or after scene is created and before stage is simulated or rendered.
If the stage is not in memory or rendering is not enabled, this function will return without attaching.
.. versionadded:: 2.3.0
This function is available in Isaac Sim 5.0 and later. For backwards
compatibility, it returns without attaching to the USD context.
Args:
attaching_early: Whether to attach the stage to the usd context before stage is created. Defaults to False.
"""
import carb
import omni.physx
import omni.usd
from isaacsim.core.simulation_manager import SimulationManager
from isaaclab.sim.simulation_context import SimulationContext
# if Isaac Sim version is less than 5.0, stage in memory is not supported
if get_isaac_sim_version().major < 5:
return
# if stage is not in memory, we can return early
if not is_current_stage_in_memory():
return
# attach stage to physx
stage_id = get_current_stage_id()
physx_sim_interface = omni.physx.get_physx_simulation_interface()
physx_sim_interface.attach_stage(stage_id)
# this carb flag is equivalent to if rendering is enabled
carb_setting = carb.settings.get_settings() # type: ignore
is_rendering_enabled = carb_setting.get("/physics/fabricUpdateTransformations")
# if rendering is not enabled, we don't need to attach it
if not is_rendering_enabled:
return
# early attach warning msg
if attaching_early:
logger.warning(
"Attaching stage in memory to USD context early to support an operation which"
" does not support stage in memory."
)
# skip this callback to avoid wiping the stage after attachment
SimulationContext.instance().skip_next_stage_open_callback()
# disable stage open callback to avoid clearing callbacks
SimulationManager.enable_stage_open_callback(False)
# enable physics fabric
SimulationContext.instance()._physics_context.enable_fabric(True) # type: ignore
# attach stage to usd context
omni.usd.get_context().attach_stage_with_callback(stage_id)
# attach stage to physx
physx_sim_interface = omni.physx.get_physx_simulation_interface()
physx_sim_interface.attach_stage(stage_id)
# re-enable stage open callback
SimulationManager.enable_stage_open_callback(True)