# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import gc
import logging
import os
import traceback
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any
import toml
import torch
from pxr import Gf, Usd, UsdGeom, UsdPhysics, UsdUtils
import isaaclab.sim as sim_utils
import isaaclab.sim.utils.stage as stage_utils
from isaaclab.app.settings_manager import SettingsManager
from isaaclab.physics import BaseSceneDataProvider, PhysicsManager, SceneDataProvider
from isaaclab.physics.scene_data_requirements import (
SceneDataRequirement,
VisualizerPrebuiltArtifacts,
resolve_scene_data_requirements,
)
from isaaclab.sim.utils import create_new_stage
from isaaclab.utils.version import has_kit
from isaaclab.visualizers.base_visualizer import BaseVisualizer
from .simulation_cfg import SimulationCfg
from .spawners import DomeLightCfg, GroundPlaneCfg
logger = logging.getLogger(__name__)
# Visualizer type names (CLI and config). App launcher parses CSV and stores as a space-separated setting.
_VISUALIZER_TYPES = ("newton", "rerun", "viser", "kit")
class SettingsHelper:
"""Helper for typed settings access via SettingsManager."""
def __init__(self, settings: SettingsManager):
self._settings = settings
def set(self, name: str, value: Any) -> None:
"""Set a setting with automatic type routing."""
if isinstance(value, bool):
self._settings.set_bool(name, value)
elif isinstance(value, int):
self._settings.set_int(name, value)
elif isinstance(value, float):
self._settings.set_float(name, value)
elif isinstance(value, str):
self._settings.set_string(name, value)
elif isinstance(value, (list, tuple)):
self._settings.set(name, value)
else:
raise ValueError(f"Unsupported value type for setting '{name}': {type(value)}")
def get(self, name: str) -> Any:
"""Get a setting value."""
return self._settings.get(name)
[docs]
class SimulationContext:
"""Controls simulation lifecycle including physics stepping and rendering.
This singleton class manages:
* Physics configuration (time-step, solver parameters via :class:`isaaclab.sim.SimulationCfg`)
* Simulation state (play, pause, step, stop)
* Rendering and visualization
The singleton instance can be accessed using the ``instance()`` class method.
"""
# SINGLETON PATTERN
_instance: SimulationContext | None = None
[docs]
def __new__(cls, cfg: SimulationCfg | None = None):
"""Enforce singleton pattern."""
if cls._instance is not None:
return cls._instance
return super().__new__(cls)
[docs]
@classmethod
def instance(cls) -> SimulationContext | None:
"""Get the singleton instance, or None if not created."""
return cls._instance
[docs]
def __init__(self, cfg: SimulationCfg | None = None):
"""Initialize the simulation context.
Args:
cfg: Simulation configuration. Defaults to None (uses default config).
"""
if type(self)._instance is not None:
return # Already initialized
# Store config
self.cfg = SimulationCfg() if cfg is None else cfg
# Get or create stage based on config
stage_cache = UsdUtils.StageCache.Get()
if self.cfg.create_stage_in_memory:
self.stage = create_new_stage()
else:
# Prefer the thread-local current stage (set by create_new_stage / test fixtures)
# over cache lookup, since the cache may contain stale stages from prior tests.
current = getattr(stage_utils._context, "stage", None)
if current is not None:
self.stage = current
else:
all_stages = stage_cache.GetAllStages() if stage_cache.Size() > 0 else [] # type: ignore[union-attr]
self.stage = all_stages[0] if all_stages else create_new_stage()
# Ensure stage is in the USD cache
stage_id = stage_cache.GetId(self.stage).ToLongInt() # type: ignore[union-attr]
if stage_id < 0:
stage_cache.Insert(self.stage) # type: ignore[union-attr]
# Set as current stage in thread-local context for get_current_stage()
stage_utils._context.stage = self.stage
# When Kit is running, attach the stage to Kit's USD context so that
# Kit extensions (PhysX views, Articulation, viewport) can discover it.
if has_kit():
import omni.usd
kit_context = omni.usd.get_context()
if kit_context is not None and kit_context.get_stage() is not self.stage:
kit_context.attach_stage_with_callback(stage_cache.GetId(self.stage).ToLongInt())
# Acquire settings interface (SettingsManager: standalone dict or Omniverse when available)
self.settings = SettingsManager.instance()
self._settings_helper = SettingsHelper(self.settings)
# Initialize USD physics scene and physics manager
self._init_usd_physics_scene()
# Normalize "cuda" -> "cuda:<id>" now that the USD physics scene is initialized
# and /physics/cudaDevice is available. Update cfg.device in-place so all
# downstream code (physics backends, assets, sensors) sees a consistent value.
if "cuda" in self.cfg.device and ":" not in self.cfg.device:
cuda_device = self.get_setting("/physics/cudaDevice")
device_id = max(0, int(cuda_device) if cuda_device is not None else 0)
self.cfg.device = f"cuda:{device_id}"
# Set default physics backend if not specified
if self.cfg.physics is None:
from isaaclab_physx.physics import PhysxCfg
self.cfg.physics = PhysxCfg()
self._physics = self.cfg.physics
# If physics is a PresetCfg wrapper (has a 'default' field but no 'class_type'),
# resolve to the default preset so downstream code always sees a concrete PhysicsCfg.
if not hasattr(self._physics, "class_type") and hasattr(self._physics, "default"):
self._physics = self._physics.default
self.cfg.physics = self._physics
self.physics_manager: type[PhysicsManager] = self._physics.class_type
self.physics_manager.initialize(self)
self._apply_render_cfg_settings()
# Initialize visualizer state (provider/visualizers are created lazily during initialize_visualizers()).
self._scene_data_provider: BaseSceneDataProvider | None = None
self._visualizers: list[BaseVisualizer] = []
self._scene_data_requirements = SceneDataRequirement()
self._visualizer_prebuilt_artifact: VisualizerPrebuiltArtifacts | None = None
self._visualizer_step_counter = 0
# Default visualization dt used before/without visualizer initialization.
physics_dt = getattr(self.cfg.physics, "dt", None)
self._viz_dt = (physics_dt if physics_dt is not None else self.cfg.dt) * self.cfg.render_interval
# Cache commonly-used settings (these don't change during runtime)
self._has_gui = bool(self.get_setting("/isaaclab/has_gui"))
self._has_offscreen_render = bool(self.get_setting("/isaaclab/render/offscreen"))
self._xr_enabled = bool(self.get_setting("/isaaclab/xr/enabled"))
# Note: has_rtx_sensors is NOT cached because it changes when Camera sensors are created
# Simulation state
self._is_playing = False
self._is_stopped = True
# Monotonic physics-step counter used by camera sensors for
self._physics_step_count: int = 0
type(self)._instance = self # Mark as valid singleton only after successful init
def _apply_render_cfg_settings(self) -> None:
"""Apply render preset and overrides from SimulationCfg.render."""
# TODO: Refactor render preset + override handling to a dedicated RenderingQualityCfg
# (name subject to change) to keep quality profiles and carb mappings centralized.
render_cfg = getattr(self.cfg, "render", None)
if render_cfg is None:
return
# Priority:
# 1) CLI/AppLauncher setting if present, 2) SimulationCfg.render.rendering_mode.
rendering_mode = self.get_setting("/isaaclab/rendering/rendering_mode")
if not rendering_mode:
rendering_mode = getattr(render_cfg, "rendering_mode", None)
if rendering_mode:
supported_rendering_modes = {"performance", "balanced", "quality"}
if rendering_mode not in supported_rendering_modes:
raise ValueError(
f"RenderCfg rendering mode '{rendering_mode}' not in supported modes "
f"{sorted(supported_rendering_modes)}."
)
isaaclab_app_exp_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), *[".."] * 4, "apps")
from isaaclab.utils.version import get_isaac_sim_version
if get_isaac_sim_version().major < 6:
isaaclab_app_exp_path = os.path.join(isaaclab_app_exp_path, "isaacsim_5")
preset_filename = os.path.join(isaaclab_app_exp_path, f"rendering_modes/{rendering_mode}.kit")
if os.path.exists(preset_filename):
with open(preset_filename) as file:
preset_dict = toml.load(file)
def _apply_nested(data: dict[str, Any], path: str = "") -> None:
for key, value in data.items():
key_path = f"{path}/{key}" if path else f"/{key}"
if isinstance(value, dict):
_apply_nested(value, key_path)
else:
self.set_setting(key_path.replace(".", "/"), value)
_apply_nested(preset_dict)
else:
logger.warning("[SimulationContext] Render preset file not found: %s", preset_filename)
# RenderCfg fields mapped to setting paths (stored via SettingsManager)
field_to_setting = {
"enable_translucency": "/rtx/translucency/enabled",
"enable_reflections": "/rtx/reflections/enabled",
"enable_global_illumination": "/rtx/indirectDiffuse/enabled",
"enable_dlssg": "/rtx-transient/dlssg/enabled",
"enable_dl_denoiser": "/rtx-transient/dldenoiser/enabled",
"dlss_mode": "/rtx/post/dlss/execMode",
"enable_direct_lighting": "/rtx/directLighting/enabled",
"samples_per_pixel": "/rtx/directLighting/sampledLighting/samplesPerPixel",
"enable_shadows": "/rtx/shadows/enabled",
"enable_ambient_occlusion": "/rtx/ambientOcclusion/enabled",
"dome_light_upper_lower_strategy": "/rtx/domeLight/upperLowerStrategy",
}
for key, value in vars(render_cfg).items():
if value is None or key in {"rendering_mode", "carb_settings", "antialiasing_mode"}:
continue
setting_path = field_to_setting.get(key)
if setting_path is not None:
self.set_setting(setting_path, value)
# Raw overrides from render_cfg (stored via SettingsManager)
extra_settings = getattr(render_cfg, "carb_settings", None)
if extra_settings:
for key, value in extra_settings.items():
if "_" in key:
path = "/" + key.replace("_", "/")
elif "." in key:
path = "/" + key.replace(".", "/")
else:
path = key
self.set_setting(path, value)
# Optional anti-aliasing mode via Replicator (best-effort, may use Omniverse APIs)
antialiasing_mode = getattr(render_cfg, "antialiasing_mode", None)
if antialiasing_mode is not None:
try:
import omni.replicator.core as rep
rep.settings.set_render_rtx_realtime(antialiasing=antialiasing_mode)
except Exception:
pass
def _init_usd_physics_scene(self) -> None:
"""Create and configure the USD physics scene."""
cfg = self.cfg
with sim_utils.use_stage(self.stage):
# Set stage conventions for metric units
UsdGeom.SetStageUpAxis(self.stage, "Z")
UsdGeom.SetStageMetersPerUnit(self.stage, 1.0)
UsdPhysics.SetStageKilogramsPerUnit(self.stage, 1.0)
# Find and delete any existing physics scene
for prim in self.stage.Traverse():
if prim.GetTypeName() == "PhysicsScene":
sim_utils.delete_prim(prim.GetPath().pathString, stage=self.stage)
# Create a new physics scene
if self.stage.GetPrimAtPath(cfg.physics_prim_path).IsValid():
raise RuntimeError(f"A prim already exists at path '{cfg.physics_prim_path}'.")
physics_scene = UsdPhysics.Scene.Define(self.stage, cfg.physics_prim_path)
# Pre-create gravity tensor to avoid torch heap corruption issues (torch 2.1+)
gravity = torch.tensor(cfg.gravity, dtype=torch.float32, device=self.cfg.device)
gravity_magnitude = torch.norm(gravity).item()
if gravity_magnitude == 0.0:
gravity_direction = [0.0, 0.0, -1.0]
else:
gravity_direction = (gravity / gravity_magnitude).tolist()
physics_scene.CreateGravityDirectionAttr(Gf.Vec3f(*gravity_direction))
physics_scene.CreateGravityMagnitudeAttr(gravity_magnitude)
@property
def physics_sim_view(self):
"""Returns the physics simulation view."""
return self.physics_manager.get_physics_sim_view()
@property
def device(self) -> str:
"""Returns the device on which the simulation is running."""
return self.physics_manager.get_device()
@property
def backend(self) -> str:
"""Returns the tensor backend being used ("numpy" or "torch")."""
return self.physics_manager.get_backend()
@property
def has_gui(self) -> bool:
"""Returns whether GUI is enabled (cached at init)."""
return self._has_gui
@property
def has_offscreen_render(self) -> bool:
"""Returns whether offscreen rendering is enabled (cached at init)."""
return self._has_offscreen_render
@property
def is_rendering(self) -> bool:
"""Returns whether rendering is active (GUI, RTX sensors, visualizers, or XR)."""
return (
self._has_gui
or self._has_offscreen_render
or self.get_setting("/isaaclab/render/rtx_sensors")
or bool(self.resolve_visualizer_types())
or self._xr_enabled
)
[docs]
def get_physics_dt(self) -> float:
"""Returns the physics time step."""
return self.physics_manager.get_physics_dt()
def _create_default_visualizer_configs(self, requested_visualizers: list[str]) -> list:
"""Create default visualizer configs for requested types.
Loads only the requested visualizer submodule (e.g. isaaclab_visualizers.rerun)
so dependencies for other backends are not imported.
"""
import importlib
default_configs = []
cfg_class_names = {
"kit": "KitVisualizerCfg",
"newton": "NewtonVisualizerCfg",
"rerun": "RerunVisualizerCfg",
"viser": "ViserVisualizerCfg",
}
for viz_type in requested_visualizers:
try:
if viz_type not in _VISUALIZER_TYPES:
logger.warning(
f"[SimulationContext] Unknown visualizer type '{viz_type}' requested. "
f"Valid types: {', '.join(repr(t) for t in _VISUALIZER_TYPES)}. Skipping."
)
continue
mod = importlib.import_module(f"isaaclab_visualizers.{viz_type}")
cfg_cls = getattr(mod, cfg_class_names[viz_type])
default_configs.append(cfg_cls())
except (ImportError, ModuleNotFoundError) as exc:
# isaaclab_visualizers is optional; log once at warning level
if "isaaclab_visualizers" in str(exc):
logger.warning(
"[SimulationContext] Visualizer '%s' skipped: isaaclab_visualizers is not installed. "
"Install with: pip install isaaclab_visualizers[%s]",
viz_type,
viz_type,
)
else:
logger.error(
"[SimulationContext] Failed to create default config for visualizer '%s': %s",
viz_type,
exc,
)
except Exception as exc:
logger.error(f"[SimulationContext] Failed to create default config for visualizer '{viz_type}': {exc}")
return default_configs
def _get_cli_visualizer_types(self) -> list[str]:
"""Return list of visualizer types requested via CLI (setting)."""
requested = self.get_setting("/isaaclab/visualizer/types")
if not isinstance(requested, str) or not requested.strip():
return []
# App launcher writes this as a single string; accept comma and/or whitespace separators.
return [value for chunk in requested.split(",") for value in chunk.split() if value]
def _get_cli_visualizer_max_worlds_override(self) -> tuple[bool, int | None]:
"""Return CLI override for visualizer max worlds.
Returns:
Tuple of (has_override, value), where value=None means no override.
"""
value = self.get_setting("/isaaclab/visualizer/max_worlds")
if value is None:
return False, None
try:
max_worlds = int(value)
except (TypeError, ValueError):
logger.warning("[SimulationContext] Invalid /isaaclab/visualizer/max_worlds setting: %r", value)
return False, None
# -1 means no CLI override.
if max_worlds < 0:
return False, None
return True, max_worlds
def _apply_visualizer_cli_overrides(self, visualizer_cfgs: list[Any]) -> None:
"""Apply CLI visualizer overrides (e.g., max worlds) to resolved configs.
Args:
visualizer_cfgs: Resolved visualizer configs to update in-place.
"""
has_max_worlds_override, max_worlds_override = self._get_cli_visualizer_max_worlds_override()
if not has_max_worlds_override:
return
for cfg in visualizer_cfgs:
if hasattr(cfg, "max_worlds"):
cfg.max_worlds = max_worlds_override
def _is_cli_visualizer_explicit(self) -> bool:
"""Return ``True`` when visualizers were explicitly provided via CLI."""
return bool(self.get_setting("/isaaclab/visualizer/explicit"))
def _is_cli_visualizer_disable_all(self) -> bool:
"""Return ``True`` when CLI requested ``--viz none`` semantics."""
return bool(self.get_setting("/isaaclab/visualizer/disable_all"))
[docs]
def resolve_visualizer_types(self) -> list[str]:
"""Resolve visualizer types from config or CLI settings."""
if self._is_cli_visualizer_disable_all():
return []
if self._is_cli_visualizer_explicit():
return self._get_cli_visualizer_types()
visualizer_cfgs = self.cfg.visualizer_cfgs
if visualizer_cfgs is None:
return []
if not isinstance(visualizer_cfgs, list):
visualizer_cfgs = [visualizer_cfgs]
return [cfg.visualizer_type for cfg in visualizer_cfgs if getattr(cfg, "visualizer_type", None)]
def _resolve_visualizer_cfgs(self) -> list[Any]:
"""Resolve final visualizer configs from cfg and optional CLI override.
When visualizers are explicitly requested via ``--visualizer`` CLI flag,
a :class:`RuntimeError` is raised if any requested type cannot be
resolved (unknown type or missing package).
"""
visualizer_cfgs: list[Any] = []
if self.cfg.visualizer_cfgs is not None:
visualizer_cfgs = (
self.cfg.visualizer_cfgs if isinstance(self.cfg.visualizer_cfgs, list) else [self.cfg.visualizer_cfgs]
)
cli_requested = self._get_cli_visualizer_types()
cli_explicit = self._is_cli_visualizer_explicit()
cli_disable_all = self._is_cli_visualizer_disable_all()
if cli_disable_all:
resolved = []
elif not cli_explicit:
self._apply_visualizer_cli_overrides(visualizer_cfgs)
resolved = visualizer_cfgs
elif not visualizer_cfgs:
resolved = self._create_default_visualizer_configs(cli_requested) if cli_requested else []
self._apply_visualizer_cli_overrides(resolved)
else:
# CLI selection is explicit: keep only requested cfg types, then add defaults for missing.
cli_requested_set = set(cli_requested)
resolved = [cfg for cfg in visualizer_cfgs if getattr(cfg, "visualizer_type", None) in cli_requested_set]
existing_types = {getattr(cfg, "visualizer_type", None) for cfg in resolved}
for viz_type in cli_requested:
if viz_type not in existing_types and viz_type in _VISUALIZER_TYPES:
resolved.extend(self._create_default_visualizer_configs([viz_type]))
existing_types.add(viz_type)
self._apply_visualizer_cli_overrides(resolved)
# When visualizers were explicitly requested via CLI, verify all
# requested types were resolved. This catches unknown types and
# missing packages that _create_default_visualizer_configs silently
# skips.
if cli_explicit and cli_requested:
resolved_types = {getattr(cfg, "visualizer_type", None) for cfg in resolved}
missing = [t for t in cli_requested if t not in resolved_types]
if missing:
raise RuntimeError(
f"Explicitly requested visualizer(s) {missing} could not be configured. "
f"Valid types: {', '.join(repr(t) for t in _VISUALIZER_TYPES)}. "
"Ensure the required package is installed "
"(e.g., pip install isaaclab_visualizers[<type>])."
)
# XR auto-start: auto-inject a KitVisualizer when XR is active and no
# Kit visualizer is already present. The KitVisualizer pumps
# app.update() and triggers forward() (via requires_forward_before_step)
# to sync Fabric data so the XR runtime receives up-to-date hand/joint
# transforms each frame.
if self._xr_enabled and bool(self.get_setting("/isaaclab/xr/auto_start")):
has_kit = any(getattr(cfg, "visualizer_type", None) == "kit" for cfg in resolved)
if not has_kit:
try:
import importlib
mod = importlib.import_module("isaaclab_visualizers.kit")
kit_cfg_cls = getattr(mod, "KitVisualizerCfg")
resolved.append(kit_cfg_cls())
logger.info("[SimulationContext] Auto-injecting KitVisualizer for XR app-update pumping.")
except (ImportError, ModuleNotFoundError, AttributeError) as exc:
logger.warning(
"[SimulationContext] XR mode could not auto-inject a KitVisualizer: %s. "
"Install isaaclab_visualizers[kit] or pass --visualizer kit.",
exc,
)
return resolved
[docs]
def initialize_visualizers(self) -> None:
"""Initialize visualizers from SimulationCfg.visualizer_cfgs."""
if self._visualizers:
return
physics_dt = getattr(self.cfg.physics, "dt", None)
self._viz_dt = (physics_dt if physics_dt is not None else self.cfg.dt) * self.cfg.render_interval
visualizer_cfgs = self._resolve_visualizer_cfgs()
if not visualizer_cfgs:
return
cli_explicit = self._is_cli_visualizer_explicit()
# Resolve visualizer-driven requirements once and keep optional artifact payload untouched.
visualizer_types = [
cfg.visualizer_type for cfg in visualizer_cfgs if getattr(cfg, "visualizer_type", None) is not None
]
requirements = resolve_scene_data_requirements(visualizer_types=visualizer_types)
self._scene_data_requirements = requirements
self.initialize_scene_data_provider()
self._visualizers = []
for cfg in visualizer_cfgs:
try:
visualizer = cfg.create_visualizer()
visualizer.initialize(self._scene_data_provider)
self._visualizers.append(visualizer)
except Exception as exc:
if cli_explicit:
raise RuntimeError(
f"Visualizer '{cfg.visualizer_type}' was explicitly requested "
f"but failed to create or initialize: {exc}"
) from exc
logger.exception(
"Failed to initialize visualizer '%s' (%s): %s",
cfg.visualizer_type,
type(cfg).__name__,
exc,
)
if not self._visualizers and self._scene_data_provider is not None:
close_provider = getattr(self._scene_data_provider, "close", None)
if callable(close_provider):
close_provider()
self._scene_data_provider = None
def initialize_scene_data_provider(self) -> BaseSceneDataProvider:
if self._scene_data_provider is None:
self._scene_data_provider = SceneDataProvider(self.stage, self)
return self._scene_data_provider
[docs]
def get_scene_data_requirements(self) -> SceneDataRequirement:
"""Return scene-data requirements resolved from visualizers/renderers."""
return self._scene_data_requirements
[docs]
def update_scene_data_requirements(self, requirements: SceneDataRequirement) -> None:
"""Update scene-data requirements."""
self._scene_data_requirements = requirements
[docs]
def get_scene_data_visualizer_prebuilt_artifact(self) -> VisualizerPrebuiltArtifacts | None:
"""Return optional prebuilt visualizer artifact."""
return self._visualizer_prebuilt_artifact
[docs]
def set_scene_data_visualizer_prebuilt_artifact(self, artifact: VisualizerPrebuiltArtifacts | None) -> None:
"""Set or clear the optional visualizer prebuilt artifact.
The scene (clone flow) writes this once, and providers can read it
during initialization as a fast path.
"""
self._visualizer_prebuilt_artifact = artifact
[docs]
def clear_scene_data_visualizer_prebuilt_artifact(self) -> None:
"""Clear optional prebuilt artifact in provider context."""
self.set_scene_data_visualizer_prebuilt_artifact(None)
@property
def visualizers(self) -> list[BaseVisualizer]:
"""Returns the list of active visualizers."""
return self._visualizers
[docs]
def get_rendering_dt(self) -> float:
"""Return rendering dt, allowing visualizer-specific override."""
for viz in self._visualizers:
viz_dt = viz.get_rendering_dt()
if viz_dt is not None and viz_dt > 0:
return float(viz_dt)
return self._viz_dt
[docs]
def set_camera_view(self, eye: tuple, target: tuple) -> None:
"""Set camera view on all visualizers that support it."""
for viz in self._visualizers:
viz.set_camera_view(eye, target)
[docs]
def forward(self) -> None:
"""Update kinematics without stepping physics."""
self.physics_manager.forward()
[docs]
def reset(self, soft: bool = False) -> None:
"""Reset the simulation.
Args:
soft: If True, skip full reinitialization.
"""
self.physics_manager.reset(soft)
for viz in self._visualizers:
viz.reset(soft)
# Start the timeline so the play button is pressed
self.physics_manager.play()
if not self._visualizers:
# Initialize visualizers after PhysX sim view is ready.
self.initialize_visualizers()
self._is_playing = True
self._is_stopped = False
[docs]
def step(self, render: bool = True) -> None:
"""Step physics and optionally render.
Args:
render: Whether to render the scene after stepping. Defaults to True.
"""
self._physics_step_count += 1
self.physics_manager.step()
if render and self.is_rendering:
self.render()
[docs]
def render(self, mode: int | None = None) -> None:
"""Update visualizers and render the scene.
Calls update_visualizers() so visualizers run at the render cadence (not at
every physics step). Camera sensors drive their configured renderer when
fetching data, so this method remains backend-agnostic.
"""
self.update_visualizers(self.get_rendering_dt())
# Call render callbacks
if hasattr(self, "_render_callbacks"):
for callback in self._render_callbacks.values():
callback(None) # Pass None as event data
[docs]
def update_visualizers(self, dt: float) -> None:
"""Update visualizers without triggering renderer/GUI."""
if not self._visualizers:
return
self.update_scene_data_provider()
visualizers_to_remove = []
for viz in self._visualizers:
try:
if viz.is_closed or not viz.is_running():
if viz.is_closed:
logger.info("Visualizer closed: %s", type(viz).__name__)
else:
logger.info("Visualizer not running: %s", type(viz).__name__)
visualizers_to_remove.append(viz)
continue
if viz.is_rendering_paused():
continue
while viz.is_training_paused() and viz.is_running():
viz.step(0.0)
viz.step(dt)
except Exception as exc:
logger.error("Error stepping visualizer '%s': %s", type(viz).__name__, exc)
visualizers_to_remove.append(viz)
for viz in visualizers_to_remove:
try:
viz.close()
self._visualizers.remove(viz)
logger.info("Removed visualizer: %s", type(viz).__name__)
except Exception as exc:
logger.error("Error closing visualizer: %s", exc)
def update_scene_data_provider(self, force_require_forward: bool = False):
if force_require_forward or self._should_forward_before_visualizer_update():
self.physics_manager.forward()
self._visualizer_step_counter += 1
if self._scene_data_provider is None:
return
provider = self._scene_data_provider
env_ids_union: list[int] = []
for viz in self._visualizers:
ids = viz.get_visualized_env_ids()
if ids is not None:
env_ids_union.extend(ids)
env_ids = list(dict.fromkeys(env_ids_union)) if env_ids_union else None
provider.update(env_ids)
def _should_forward_before_visualizer_update(self) -> bool:
"""Return True if any visualizer requires pre-step forward kinematics."""
return any(viz.requires_forward_before_step() for viz in self._visualizers)
[docs]
def play(self) -> None:
"""Start or resume the simulation."""
self.physics_manager.play()
for viz in self._visualizers:
viz.play()
self._is_playing = True
self._is_stopped = False
[docs]
def pause(self) -> None:
"""Pause the simulation (can be resumed with play)."""
self.physics_manager.pause()
for viz in self._visualizers:
viz.pause()
self._is_playing = False
[docs]
def stop(self) -> None:
"""Stop the simulation completely."""
self.physics_manager.stop()
for viz in self._visualizers:
viz.stop()
self._is_playing = False
self._is_stopped = True
[docs]
def is_playing(self) -> bool:
"""Returns True if simulation is playing (not paused or stopped)."""
return self._is_playing
[docs]
def is_stopped(self) -> bool:
"""Returns True if simulation is stopped (not just paused)."""
return self._is_stopped
[docs]
def set_setting(self, name: str, value: Any) -> None:
"""Set a setting value."""
self._settings_helper.set(name, value)
[docs]
def get_setting(self, name: str) -> Any:
"""Get a setting value."""
return self._settings_helper.get(name)
[docs]
@classmethod
def clear_instance(cls) -> None:
"""Clean up resources and clear the singleton instance."""
if cls._instance is not None:
# Close physics manager FIRST to detach PhysX from the stage
# This must happen before clearing USD prims to avoid PhysX cleanup errors
cls._instance.physics_manager.close()
# Close all visualizers
for viz in cls._instance._visualizers:
viz.close()
cls._instance._visualizers.clear()
if cls._instance._scene_data_provider is not None:
close_provider = getattr(cls._instance._scene_data_provider, "close", None)
if callable(close_provider):
close_provider()
cls._instance._scene_data_provider = None
# Tear down the stage. We skip clear_stage() (prim-by-prim deletion) since
# close_stage() + app shutdown destroy the entire stage at once.
stage_utils.close_stage()
# Clear instance
cls._instance = None
gc.collect()
logger.info("SimulationContext cleared")
[docs]
@classmethod
def clear_stage(cls) -> None:
"""Clear the current USD stage (preserving /World and PhysicsScene).
Uses a predicate that preserves /World and PhysicsScene while also
respecting the default deletability checks (ancestral prims, etc.).
"""
if cls._instance is None:
return
def _predicate(prim: Usd.Prim) -> bool:
path = prim.GetPath().pathString
if path == "/World":
return False
if prim.GetTypeName() == "PhysicsScene":
return False
return True
sim_utils.clear_stage(predicate=_predicate)
@contextmanager
def build_simulation_context(
create_new_stage: bool = True,
gravity_enabled: bool = True,
device: str = "cuda:0",
dt: float = 0.01,
sim_cfg: SimulationCfg | None = None,
add_ground_plane: bool = False,
add_lighting: bool = False,
auto_add_lighting: bool = False,
visualizers: list[str] | None = None,
) -> Iterator[SimulationContext]:
"""Context manager to build a simulation context with the provided settings.
Args:
create_new_stage: Whether to create a new stage. Defaults to True.
gravity_enabled: Whether to enable gravity. Defaults to True.
device: Device to run the simulation on. Defaults to "cuda:0".
dt: Time step for the simulation. Defaults to 0.01.
sim_cfg: SimulationCfg to use. Defaults to None.
add_ground_plane: Whether to add a ground plane. Defaults to False.
add_lighting: Whether to add a dome light. Defaults to False.
auto_add_lighting: Whether to auto-add lighting if GUI present. Defaults to False.
visualizers: List of visualizer backend keys to enable (e.g. ``["kit", "newton", "rerun"]``).
Valid types: ``"kit"``, ``"newton"``, ``"rerun"``, ``"viser"``.
When provided, sets the ``/isaaclab/visualizer/types`` setting so the
existing visualizer resolution machinery picks them up. Defaults to None.
Yields:
The simulation context to use for the simulation.
"""
sim: SimulationContext | None = None
try:
if create_new_stage:
sim_utils.create_new_stage()
if sim_cfg is None:
gravity = (0.0, 0.0, -9.81) if gravity_enabled else (0.0, 0.0, 0.0)
sim_cfg = SimulationCfg(device=device, dt=dt, gravity=gravity)
sim = SimulationContext(sim_cfg)
if visualizers:
sim.set_setting("/isaaclab/visualizer/types", " ".join(visualizers))
if add_ground_plane:
cfg = GroundPlaneCfg()
cfg.func("/World/defaultGroundPlane", cfg)
if add_lighting or (auto_add_lighting and (sim.get_setting("/isaaclab/has_gui") or visualizers)):
cfg = DomeLightCfg(
color=(0.1, 0.1, 0.1), enable_color_temperature=True, color_temperature=5500, intensity=10000
)
cfg.func(prim_path="/World/defaultDomeLight", cfg=cfg, translation=(0.0, 0.0, 10.0))
yield sim
except Exception:
logger.error(traceback.format_exc())
raise
finally:
if sim is not None:
if not sim.get_setting("/isaaclab/has_gui"):
sim.stop()
sim.clear_instance()