Source code for isaaclab.sim.simulation_context

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import gc
import logging
import os
import traceback
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any

import toml
import torch

from pxr import Gf, Usd, UsdGeom, UsdPhysics, UsdUtils

import isaaclab.sim as sim_utils
import isaaclab.sim.utils.stage as stage_utils
from isaaclab.app.settings_manager import SettingsManager
from isaaclab.physics import BaseSceneDataProvider, PhysicsManager, SceneDataProvider
from isaaclab.physics.scene_data_requirements import (
    SceneDataRequirement,
    VisualizerPrebuiltArtifacts,
    resolve_scene_data_requirements,
)
from isaaclab.sim.utils import create_new_stage
from isaaclab.utils.version import has_kit
from isaaclab.visualizers.base_visualizer import BaseVisualizer

from .simulation_cfg import SimulationCfg
from .spawners import DomeLightCfg, GroundPlaneCfg

logger = logging.getLogger(__name__)

# Visualizer type names (CLI and config). App launcher parses CSV and stores as a space-separated setting.
_VISUALIZER_TYPES = ("newton", "rerun", "viser", "kit")


class SettingsHelper:
    """Helper for typed settings access via SettingsManager."""

    def __init__(self, settings: SettingsManager):
        self._settings = settings

    def set(self, name: str, value: Any) -> None:
        """Set a setting with automatic type routing."""
        if isinstance(value, bool):
            self._settings.set_bool(name, value)
        elif isinstance(value, int):
            self._settings.set_int(name, value)
        elif isinstance(value, float):
            self._settings.set_float(name, value)
        elif isinstance(value, str):
            self._settings.set_string(name, value)
        elif isinstance(value, (list, tuple)):
            self._settings.set(name, value)
        else:
            raise ValueError(f"Unsupported value type for setting '{name}': {type(value)}")

    def get(self, name: str) -> Any:
        """Get a setting value."""
        return self._settings.get(name)


[docs] class SimulationContext: """Controls simulation lifecycle including physics stepping and rendering. This singleton class manages: * Physics configuration (time-step, solver parameters via :class:`isaaclab.sim.SimulationCfg`) * Simulation state (play, pause, step, stop) * Rendering and visualization The singleton instance can be accessed using the ``instance()`` class method. """ # SINGLETON PATTERN _instance: SimulationContext | None = None
[docs] def __new__(cls, cfg: SimulationCfg | None = None): """Enforce singleton pattern.""" if cls._instance is not None: return cls._instance return super().__new__(cls)
[docs] @classmethod def instance(cls) -> SimulationContext | None: """Get the singleton instance, or None if not created.""" return cls._instance
[docs] def __init__(self, cfg: SimulationCfg | None = None): """Initialize the simulation context. Args: cfg: Simulation configuration. Defaults to None (uses default config). """ if type(self)._instance is not None: return # Already initialized # Store config self.cfg = SimulationCfg() if cfg is None else cfg # Get or create stage based on config stage_cache = UsdUtils.StageCache.Get() if self.cfg.create_stage_in_memory: self.stage = create_new_stage() else: # Prefer the thread-local current stage (set by create_new_stage / test fixtures) # over cache lookup, since the cache may contain stale stages from prior tests. current = getattr(stage_utils._context, "stage", None) if current is not None: self.stage = current else: all_stages = stage_cache.GetAllStages() if stage_cache.Size() > 0 else [] # type: ignore[union-attr] self.stage = all_stages[0] if all_stages else create_new_stage() # Ensure stage is in the USD cache stage_id = stage_cache.GetId(self.stage).ToLongInt() # type: ignore[union-attr] if stage_id < 0: stage_cache.Insert(self.stage) # type: ignore[union-attr] # Set as current stage in thread-local context for get_current_stage() stage_utils._context.stage = self.stage # When Kit is running, attach the stage to Kit's USD context so that # Kit extensions (PhysX views, Articulation, viewport) can discover it. if has_kit(): import omni.usd kit_context = omni.usd.get_context() if kit_context is not None and kit_context.get_stage() is not self.stage: kit_context.attach_stage_with_callback(stage_cache.GetId(self.stage).ToLongInt()) # Acquire settings interface (SettingsManager: standalone dict or Omniverse when available) self.settings = SettingsManager.instance() self._settings_helper = SettingsHelper(self.settings) # Initialize USD physics scene and physics manager self._init_usd_physics_scene() # Normalize "cuda" -> "cuda:<id>" now that the USD physics scene is initialized # and /physics/cudaDevice is available. Update cfg.device in-place so all # downstream code (physics backends, assets, sensors) sees a consistent value. if "cuda" in self.cfg.device and ":" not in self.cfg.device: cuda_device = self.get_setting("/physics/cudaDevice") device_id = max(0, int(cuda_device) if cuda_device is not None else 0) self.cfg.device = f"cuda:{device_id}" # Set default physics backend if not specified if self.cfg.physics is None: from isaaclab_physx.physics import PhysxCfg self.cfg.physics = PhysxCfg() self._physics = self.cfg.physics # If physics is a PresetCfg wrapper (has a 'default' field but no 'class_type'), # resolve to the default preset so downstream code always sees a concrete PhysicsCfg. if not hasattr(self._physics, "class_type") and hasattr(self._physics, "default"): self._physics = self._physics.default self.cfg.physics = self._physics self.physics_manager: type[PhysicsManager] = self._physics.class_type self.physics_manager.initialize(self) self._apply_render_cfg_settings() # Initialize visualizer state (provider/visualizers are created lazily during initialize_visualizers()). self._scene_data_provider: BaseSceneDataProvider | None = None self._visualizers: list[BaseVisualizer] = [] self._scene_data_requirements = SceneDataRequirement() self._visualizer_prebuilt_artifact: VisualizerPrebuiltArtifacts | None = None self._visualizer_step_counter = 0 # Default visualization dt used before/without visualizer initialization. physics_dt = getattr(self.cfg.physics, "dt", None) self._viz_dt = (physics_dt if physics_dt is not None else self.cfg.dt) * self.cfg.render_interval # Cache commonly-used settings (these don't change during runtime) self._has_gui = bool(self.get_setting("/isaaclab/has_gui")) self._has_offscreen_render = bool(self.get_setting("/isaaclab/render/offscreen")) self._xr_enabled = bool(self.get_setting("/isaaclab/xr/enabled")) # Note: has_rtx_sensors is NOT cached because it changes when Camera sensors are created # Simulation state self._is_playing = False self._is_stopped = True # Monotonic physics-step counter used by camera sensors for self._physics_step_count: int = 0 type(self)._instance = self # Mark as valid singleton only after successful init
def _apply_render_cfg_settings(self) -> None: """Apply render preset and overrides from SimulationCfg.render.""" # TODO: Refactor render preset + override handling to a dedicated RenderingQualityCfg # (name subject to change) to keep quality profiles and carb mappings centralized. render_cfg = getattr(self.cfg, "render", None) if render_cfg is None: return # Priority: # 1) CLI/AppLauncher setting if present, 2) SimulationCfg.render.rendering_mode. rendering_mode = self.get_setting("/isaaclab/rendering/rendering_mode") if not rendering_mode: rendering_mode = getattr(render_cfg, "rendering_mode", None) if rendering_mode: supported_rendering_modes = {"performance", "balanced", "quality"} if rendering_mode not in supported_rendering_modes: raise ValueError( f"RenderCfg rendering mode '{rendering_mode}' not in supported modes " f"{sorted(supported_rendering_modes)}." ) isaaclab_app_exp_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), *[".."] * 4, "apps") from isaaclab.utils.version import get_isaac_sim_version if get_isaac_sim_version().major < 6: isaaclab_app_exp_path = os.path.join(isaaclab_app_exp_path, "isaacsim_5") preset_filename = os.path.join(isaaclab_app_exp_path, f"rendering_modes/{rendering_mode}.kit") if os.path.exists(preset_filename): with open(preset_filename) as file: preset_dict = toml.load(file) def _apply_nested(data: dict[str, Any], path: str = "") -> None: for key, value in data.items(): key_path = f"{path}/{key}" if path else f"/{key}" if isinstance(value, dict): _apply_nested(value, key_path) else: self.set_setting(key_path.replace(".", "/"), value) _apply_nested(preset_dict) else: logger.warning("[SimulationContext] Render preset file not found: %s", preset_filename) # RenderCfg fields mapped to setting paths (stored via SettingsManager) field_to_setting = { "enable_translucency": "/rtx/translucency/enabled", "enable_reflections": "/rtx/reflections/enabled", "enable_global_illumination": "/rtx/indirectDiffuse/enabled", "enable_dlssg": "/rtx-transient/dlssg/enabled", "enable_dl_denoiser": "/rtx-transient/dldenoiser/enabled", "dlss_mode": "/rtx/post/dlss/execMode", "enable_direct_lighting": "/rtx/directLighting/enabled", "samples_per_pixel": "/rtx/directLighting/sampledLighting/samplesPerPixel", "enable_shadows": "/rtx/shadows/enabled", "enable_ambient_occlusion": "/rtx/ambientOcclusion/enabled", "dome_light_upper_lower_strategy": "/rtx/domeLight/upperLowerStrategy", } for key, value in vars(render_cfg).items(): if value is None or key in {"rendering_mode", "carb_settings", "antialiasing_mode"}: continue setting_path = field_to_setting.get(key) if setting_path is not None: self.set_setting(setting_path, value) # Raw overrides from render_cfg (stored via SettingsManager) extra_settings = getattr(render_cfg, "carb_settings", None) if extra_settings: for key, value in extra_settings.items(): if "_" in key: path = "/" + key.replace("_", "/") elif "." in key: path = "/" + key.replace(".", "/") else: path = key self.set_setting(path, value) # Optional anti-aliasing mode via Replicator (best-effort, may use Omniverse APIs) antialiasing_mode = getattr(render_cfg, "antialiasing_mode", None) if antialiasing_mode is not None: try: import omni.replicator.core as rep rep.settings.set_render_rtx_realtime(antialiasing=antialiasing_mode) except Exception: pass def _init_usd_physics_scene(self) -> None: """Create and configure the USD physics scene.""" cfg = self.cfg with sim_utils.use_stage(self.stage): # Set stage conventions for metric units UsdGeom.SetStageUpAxis(self.stage, "Z") UsdGeom.SetStageMetersPerUnit(self.stage, 1.0) UsdPhysics.SetStageKilogramsPerUnit(self.stage, 1.0) # Find and delete any existing physics scene for prim in self.stage.Traverse(): if prim.GetTypeName() == "PhysicsScene": sim_utils.delete_prim(prim.GetPath().pathString, stage=self.stage) # Create a new physics scene if self.stage.GetPrimAtPath(cfg.physics_prim_path).IsValid(): raise RuntimeError(f"A prim already exists at path '{cfg.physics_prim_path}'.") physics_scene = UsdPhysics.Scene.Define(self.stage, cfg.physics_prim_path) # Pre-create gravity tensor to avoid torch heap corruption issues (torch 2.1+) gravity = torch.tensor(cfg.gravity, dtype=torch.float32, device=self.cfg.device) gravity_magnitude = torch.norm(gravity).item() if gravity_magnitude == 0.0: gravity_direction = [0.0, 0.0, -1.0] else: gravity_direction = (gravity / gravity_magnitude).tolist() physics_scene.CreateGravityDirectionAttr(Gf.Vec3f(*gravity_direction)) physics_scene.CreateGravityMagnitudeAttr(gravity_magnitude) @property def physics_sim_view(self): """Returns the physics simulation view.""" return self.physics_manager.get_physics_sim_view() @property def device(self) -> str: """Returns the device on which the simulation is running.""" return self.physics_manager.get_device() @property def backend(self) -> str: """Returns the tensor backend being used ("numpy" or "torch").""" return self.physics_manager.get_backend() @property def has_gui(self) -> bool: """Returns whether GUI is enabled (cached at init).""" return self._has_gui @property def has_offscreen_render(self) -> bool: """Returns whether offscreen rendering is enabled (cached at init).""" return self._has_offscreen_render @property def is_rendering(self) -> bool: """Returns whether rendering is active (GUI, RTX sensors, visualizers, or XR).""" return ( self._has_gui or self._has_offscreen_render or self.get_setting("/isaaclab/render/rtx_sensors") or bool(self.resolve_visualizer_types()) or self._xr_enabled )
[docs] def get_physics_dt(self) -> float: """Returns the physics time step.""" return self.physics_manager.get_physics_dt()
def _create_default_visualizer_configs(self, requested_visualizers: list[str]) -> list: """Create default visualizer configs for requested types. Loads only the requested visualizer submodule (e.g. isaaclab_visualizers.rerun) so dependencies for other backends are not imported. """ import importlib default_configs = [] cfg_class_names = { "kit": "KitVisualizerCfg", "newton": "NewtonVisualizerCfg", "rerun": "RerunVisualizerCfg", "viser": "ViserVisualizerCfg", } for viz_type in requested_visualizers: try: if viz_type not in _VISUALIZER_TYPES: logger.warning( f"[SimulationContext] Unknown visualizer type '{viz_type}' requested. " f"Valid types: {', '.join(repr(t) for t in _VISUALIZER_TYPES)}. Skipping." ) continue mod = importlib.import_module(f"isaaclab_visualizers.{viz_type}") cfg_cls = getattr(mod, cfg_class_names[viz_type]) default_configs.append(cfg_cls()) except (ImportError, ModuleNotFoundError) as exc: # isaaclab_visualizers is optional; log once at warning level if "isaaclab_visualizers" in str(exc): logger.warning( "[SimulationContext] Visualizer '%s' skipped: isaaclab_visualizers is not installed. " "Install with: pip install isaaclab_visualizers[%s]", viz_type, viz_type, ) else: logger.error( "[SimulationContext] Failed to create default config for visualizer '%s': %s", viz_type, exc, ) except Exception as exc: logger.error(f"[SimulationContext] Failed to create default config for visualizer '{viz_type}': {exc}") return default_configs def _get_cli_visualizer_types(self) -> list[str]: """Return list of visualizer types requested via CLI (setting).""" requested = self.get_setting("/isaaclab/visualizer/types") if not isinstance(requested, str) or not requested.strip(): return [] # App launcher writes this as a single string; accept comma and/or whitespace separators. return [value for chunk in requested.split(",") for value in chunk.split() if value] def _get_cli_visualizer_max_worlds_override(self) -> tuple[bool, int | None]: """Return CLI override for visualizer max worlds. Returns: Tuple of (has_override, value), where value=None means no override. """ value = self.get_setting("/isaaclab/visualizer/max_worlds") if value is None: return False, None try: max_worlds = int(value) except (TypeError, ValueError): logger.warning("[SimulationContext] Invalid /isaaclab/visualizer/max_worlds setting: %r", value) return False, None # -1 means no CLI override. if max_worlds < 0: return False, None return True, max_worlds def _apply_visualizer_cli_overrides(self, visualizer_cfgs: list[Any]) -> None: """Apply CLI visualizer overrides (e.g., max worlds) to resolved configs. Args: visualizer_cfgs: Resolved visualizer configs to update in-place. """ has_max_worlds_override, max_worlds_override = self._get_cli_visualizer_max_worlds_override() if not has_max_worlds_override: return for cfg in visualizer_cfgs: if hasattr(cfg, "max_worlds"): cfg.max_worlds = max_worlds_override def _is_cli_visualizer_explicit(self) -> bool: """Return ``True`` when visualizers were explicitly provided via CLI.""" return bool(self.get_setting("/isaaclab/visualizer/explicit")) def _is_cli_visualizer_disable_all(self) -> bool: """Return ``True`` when CLI requested ``--viz none`` semantics.""" return bool(self.get_setting("/isaaclab/visualizer/disable_all"))
[docs] def resolve_visualizer_types(self) -> list[str]: """Resolve visualizer types from config or CLI settings.""" if self._is_cli_visualizer_disable_all(): return [] if self._is_cli_visualizer_explicit(): return self._get_cli_visualizer_types() visualizer_cfgs = self.cfg.visualizer_cfgs if visualizer_cfgs is None: return [] if not isinstance(visualizer_cfgs, list): visualizer_cfgs = [visualizer_cfgs] return [cfg.visualizer_type for cfg in visualizer_cfgs if getattr(cfg, "visualizer_type", None)]
def _resolve_visualizer_cfgs(self) -> list[Any]: """Resolve final visualizer configs from cfg and optional CLI override. When visualizers are explicitly requested via ``--visualizer`` CLI flag, a :class:`RuntimeError` is raised if any requested type cannot be resolved (unknown type or missing package). """ visualizer_cfgs: list[Any] = [] if self.cfg.visualizer_cfgs is not None: visualizer_cfgs = ( self.cfg.visualizer_cfgs if isinstance(self.cfg.visualizer_cfgs, list) else [self.cfg.visualizer_cfgs] ) cli_requested = self._get_cli_visualizer_types() cli_explicit = self._is_cli_visualizer_explicit() cli_disable_all = self._is_cli_visualizer_disable_all() if cli_disable_all: resolved = [] elif not cli_explicit: self._apply_visualizer_cli_overrides(visualizer_cfgs) resolved = visualizer_cfgs elif not visualizer_cfgs: resolved = self._create_default_visualizer_configs(cli_requested) if cli_requested else [] self._apply_visualizer_cli_overrides(resolved) else: # CLI selection is explicit: keep only requested cfg types, then add defaults for missing. cli_requested_set = set(cli_requested) resolved = [cfg for cfg in visualizer_cfgs if getattr(cfg, "visualizer_type", None) in cli_requested_set] existing_types = {getattr(cfg, "visualizer_type", None) for cfg in resolved} for viz_type in cli_requested: if viz_type not in existing_types and viz_type in _VISUALIZER_TYPES: resolved.extend(self._create_default_visualizer_configs([viz_type])) existing_types.add(viz_type) self._apply_visualizer_cli_overrides(resolved) # When visualizers were explicitly requested via CLI, verify all # requested types were resolved. This catches unknown types and # missing packages that _create_default_visualizer_configs silently # skips. if cli_explicit and cli_requested: resolved_types = {getattr(cfg, "visualizer_type", None) for cfg in resolved} missing = [t for t in cli_requested if t not in resolved_types] if missing: raise RuntimeError( f"Explicitly requested visualizer(s) {missing} could not be configured. " f"Valid types: {', '.join(repr(t) for t in _VISUALIZER_TYPES)}. " "Ensure the required package is installed " "(e.g., pip install isaaclab_visualizers[<type>])." ) # XR auto-start: auto-inject a KitVisualizer when XR is active and no # Kit visualizer is already present. The KitVisualizer pumps # app.update() and triggers forward() (via requires_forward_before_step) # to sync Fabric data so the XR runtime receives up-to-date hand/joint # transforms each frame. if self._xr_enabled and bool(self.get_setting("/isaaclab/xr/auto_start")): has_kit = any(getattr(cfg, "visualizer_type", None) == "kit" for cfg in resolved) if not has_kit: try: import importlib mod = importlib.import_module("isaaclab_visualizers.kit") kit_cfg_cls = getattr(mod, "KitVisualizerCfg") resolved.append(kit_cfg_cls()) logger.info("[SimulationContext] Auto-injecting KitVisualizer for XR app-update pumping.") except (ImportError, ModuleNotFoundError, AttributeError) as exc: logger.warning( "[SimulationContext] XR mode could not auto-inject a KitVisualizer: %s. " "Install isaaclab_visualizers[kit] or pass --visualizer kit.", exc, ) return resolved
[docs] def initialize_visualizers(self) -> None: """Initialize visualizers from SimulationCfg.visualizer_cfgs.""" if self._visualizers: return physics_dt = getattr(self.cfg.physics, "dt", None) self._viz_dt = (physics_dt if physics_dt is not None else self.cfg.dt) * self.cfg.render_interval visualizer_cfgs = self._resolve_visualizer_cfgs() if not visualizer_cfgs: return cli_explicit = self._is_cli_visualizer_explicit() # Resolve visualizer-driven requirements once and keep optional artifact payload untouched. visualizer_types = [ cfg.visualizer_type for cfg in visualizer_cfgs if getattr(cfg, "visualizer_type", None) is not None ] requirements = resolve_scene_data_requirements(visualizer_types=visualizer_types) self._scene_data_requirements = requirements self.initialize_scene_data_provider() self._visualizers = [] for cfg in visualizer_cfgs: try: visualizer = cfg.create_visualizer() visualizer.initialize(self._scene_data_provider) self._visualizers.append(visualizer) except Exception as exc: if cli_explicit: raise RuntimeError( f"Visualizer '{cfg.visualizer_type}' was explicitly requested " f"but failed to create or initialize: {exc}" ) from exc logger.exception( "Failed to initialize visualizer '%s' (%s): %s", cfg.visualizer_type, type(cfg).__name__, exc, ) if not self._visualizers and self._scene_data_provider is not None: close_provider = getattr(self._scene_data_provider, "close", None) if callable(close_provider): close_provider() self._scene_data_provider = None
def initialize_scene_data_provider(self) -> BaseSceneDataProvider: if self._scene_data_provider is None: self._scene_data_provider = SceneDataProvider(self.stage, self) return self._scene_data_provider
[docs] def get_scene_data_requirements(self) -> SceneDataRequirement: """Return scene-data requirements resolved from visualizers/renderers.""" return self._scene_data_requirements
[docs] def update_scene_data_requirements(self, requirements: SceneDataRequirement) -> None: """Update scene-data requirements.""" self._scene_data_requirements = requirements
[docs] def get_scene_data_visualizer_prebuilt_artifact(self) -> VisualizerPrebuiltArtifacts | None: """Return optional prebuilt visualizer artifact.""" return self._visualizer_prebuilt_artifact
[docs] def set_scene_data_visualizer_prebuilt_artifact(self, artifact: VisualizerPrebuiltArtifacts | None) -> None: """Set or clear the optional visualizer prebuilt artifact. The scene (clone flow) writes this once, and providers can read it during initialization as a fast path. """ self._visualizer_prebuilt_artifact = artifact
[docs] def clear_scene_data_visualizer_prebuilt_artifact(self) -> None: """Clear optional prebuilt artifact in provider context.""" self.set_scene_data_visualizer_prebuilt_artifact(None)
@property def visualizers(self) -> list[BaseVisualizer]: """Returns the list of active visualizers.""" return self._visualizers
[docs] def get_rendering_dt(self) -> float: """Return rendering dt, allowing visualizer-specific override.""" for viz in self._visualizers: viz_dt = viz.get_rendering_dt() if viz_dt is not None and viz_dt > 0: return float(viz_dt) return self._viz_dt
[docs] def set_camera_view(self, eye: tuple, target: tuple) -> None: """Set camera view on all visualizers that support it.""" for viz in self._visualizers: viz.set_camera_view(eye, target)
[docs] def forward(self) -> None: """Update kinematics without stepping physics.""" self.physics_manager.forward()
[docs] def reset(self, soft: bool = False) -> None: """Reset the simulation. Args: soft: If True, skip full reinitialization. """ self.physics_manager.reset(soft) for viz in self._visualizers: viz.reset(soft) # Start the timeline so the play button is pressed self.physics_manager.play() if not self._visualizers: # Initialize visualizers after PhysX sim view is ready. self.initialize_visualizers() self._is_playing = True self._is_stopped = False
[docs] def step(self, render: bool = True) -> None: """Step physics and optionally render. Args: render: Whether to render the scene after stepping. Defaults to True. """ self._physics_step_count += 1 self.physics_manager.step() if render and self.is_rendering: self.render()
[docs] def render(self, mode: int | None = None) -> None: """Update visualizers and render the scene. Calls update_visualizers() so visualizers run at the render cadence (not at every physics step). Camera sensors drive their configured renderer when fetching data, so this method remains backend-agnostic. """ self.update_visualizers(self.get_rendering_dt()) # Call render callbacks if hasattr(self, "_render_callbacks"): for callback in self._render_callbacks.values(): callback(None) # Pass None as event data
[docs] def update_visualizers(self, dt: float) -> None: """Update visualizers without triggering renderer/GUI.""" if not self._visualizers: return self.update_scene_data_provider() visualizers_to_remove = [] for viz in self._visualizers: try: if viz.is_closed or not viz.is_running(): if viz.is_closed: logger.info("Visualizer closed: %s", type(viz).__name__) else: logger.info("Visualizer not running: %s", type(viz).__name__) visualizers_to_remove.append(viz) continue if viz.is_rendering_paused(): continue while viz.is_training_paused() and viz.is_running(): viz.step(0.0) viz.step(dt) except Exception as exc: logger.error("Error stepping visualizer '%s': %s", type(viz).__name__, exc) visualizers_to_remove.append(viz) for viz in visualizers_to_remove: try: viz.close() self._visualizers.remove(viz) logger.info("Removed visualizer: %s", type(viz).__name__) except Exception as exc: logger.error("Error closing visualizer: %s", exc)
def update_scene_data_provider(self, force_require_forward: bool = False): if force_require_forward or self._should_forward_before_visualizer_update(): self.physics_manager.forward() self._visualizer_step_counter += 1 if self._scene_data_provider is None: return provider = self._scene_data_provider env_ids_union: list[int] = [] for viz in self._visualizers: ids = viz.get_visualized_env_ids() if ids is not None: env_ids_union.extend(ids) env_ids = list(dict.fromkeys(env_ids_union)) if env_ids_union else None provider.update(env_ids) def _should_forward_before_visualizer_update(self) -> bool: """Return True if any visualizer requires pre-step forward kinematics.""" return any(viz.requires_forward_before_step() for viz in self._visualizers)
[docs] def play(self) -> None: """Start or resume the simulation.""" self.physics_manager.play() for viz in self._visualizers: viz.play() self._is_playing = True self._is_stopped = False
[docs] def pause(self) -> None: """Pause the simulation (can be resumed with play).""" self.physics_manager.pause() for viz in self._visualizers: viz.pause() self._is_playing = False
[docs] def stop(self) -> None: """Stop the simulation completely.""" self.physics_manager.stop() for viz in self._visualizers: viz.stop() self._is_playing = False self._is_stopped = True
[docs] def is_playing(self) -> bool: """Returns True if simulation is playing (not paused or stopped).""" return self._is_playing
[docs] def is_stopped(self) -> bool: """Returns True if simulation is stopped (not just paused).""" return self._is_stopped
[docs] def set_setting(self, name: str, value: Any) -> None: """Set a setting value.""" self._settings_helper.set(name, value)
[docs] def get_setting(self, name: str) -> Any: """Get a setting value.""" return self._settings_helper.get(name)
[docs] @classmethod def clear_instance(cls) -> None: """Clean up resources and clear the singleton instance.""" if cls._instance is not None: # Close physics manager FIRST to detach PhysX from the stage # This must happen before clearing USD prims to avoid PhysX cleanup errors cls._instance.physics_manager.close() # Close all visualizers for viz in cls._instance._visualizers: viz.close() cls._instance._visualizers.clear() if cls._instance._scene_data_provider is not None: close_provider = getattr(cls._instance._scene_data_provider, "close", None) if callable(close_provider): close_provider() cls._instance._scene_data_provider = None # Tear down the stage. We skip clear_stage() (prim-by-prim deletion) since # close_stage() + app shutdown destroy the entire stage at once. stage_utils.close_stage() # Clear instance cls._instance = None gc.collect() logger.info("SimulationContext cleared")
[docs] @classmethod def clear_stage(cls) -> None: """Clear the current USD stage (preserving /World and PhysicsScene). Uses a predicate that preserves /World and PhysicsScene while also respecting the default deletability checks (ancestral prims, etc.). """ if cls._instance is None: return def _predicate(prim: Usd.Prim) -> bool: path = prim.GetPath().pathString if path == "/World": return False if prim.GetTypeName() == "PhysicsScene": return False return True sim_utils.clear_stage(predicate=_predicate)
@contextmanager def build_simulation_context( create_new_stage: bool = True, gravity_enabled: bool = True, device: str = "cuda:0", dt: float = 0.01, sim_cfg: SimulationCfg | None = None, add_ground_plane: bool = False, add_lighting: bool = False, auto_add_lighting: bool = False, visualizers: list[str] | None = None, ) -> Iterator[SimulationContext]: """Context manager to build a simulation context with the provided settings. Args: create_new_stage: Whether to create a new stage. Defaults to True. gravity_enabled: Whether to enable gravity. Defaults to True. device: Device to run the simulation on. Defaults to "cuda:0". dt: Time step for the simulation. Defaults to 0.01. sim_cfg: SimulationCfg to use. Defaults to None. add_ground_plane: Whether to add a ground plane. Defaults to False. add_lighting: Whether to add a dome light. Defaults to False. auto_add_lighting: Whether to auto-add lighting if GUI present. Defaults to False. visualizers: List of visualizer backend keys to enable (e.g. ``["kit", "newton", "rerun"]``). Valid types: ``"kit"``, ``"newton"``, ``"rerun"``, ``"viser"``. When provided, sets the ``/isaaclab/visualizer/types`` setting so the existing visualizer resolution machinery picks them up. Defaults to None. Yields: The simulation context to use for the simulation. """ sim: SimulationContext | None = None try: if create_new_stage: sim_utils.create_new_stage() if sim_cfg is None: gravity = (0.0, 0.0, -9.81) if gravity_enabled else (0.0, 0.0, 0.0) sim_cfg = SimulationCfg(device=device, dt=dt, gravity=gravity) sim = SimulationContext(sim_cfg) if visualizers: sim.set_setting("/isaaclab/visualizer/types", " ".join(visualizers)) if add_ground_plane: cfg = GroundPlaneCfg() cfg.func("/World/defaultGroundPlane", cfg) if add_lighting or (auto_add_lighting and (sim.get_setting("/isaaclab/has_gui") or visualizers)): cfg = DomeLightCfg( color=(0.1, 0.1, 0.1), enable_color_temperature=True, color_temperature=5500, intensity=10000 ) cfg.func(prim_path="/World/defaultDomeLight", cfg=cfg, translation=(0.0, 0.0, 10.0)) yield sim except Exception: logger.error(traceback.format_exc()) raise finally: if sim is not None: if not sim.get_setting("/isaaclab/has_gui"): sim.stop() sim.clear_instance()