Source code for isaaclab_newton.physics.kamino_manager
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Kamino Newton manager."""
from __future__ import annotations
import logging
import warp as wp
from newton import Model, eval_fk
from newton.solvers import SolverKamino
from isaaclab.physics import PhysicsManager
from isaaclab.utils.timer import Timer
from .kamino_manager_cfg import KaminoSolverCfg
from .newton_manager import NewtonManager
logger = logging.getLogger(__name__)
[docs]
class NewtonKaminoManager(NewtonManager):
""":class:`NewtonManager` specialization for the Kamino solver.
Uses Newton's :class:`CollisionPipeline` unless
:attr:`KaminoSolverCfg.use_collision_detector` is ``True``, in which case
Kamino's internal collision detector handles contact generation.
"""
@classmethod
def _forward_kamino(cls, world_mask: wp.array | None = None) -> None:
"""Kamino-specific forward kinematics via ``solver.reset()``.
Kamino's ``joint_q`` / ``joint_u`` include coordinates for **all** joints
(including free joints), so we pass Newton's full state arrays directly.
Args:
world_mask: Per-world mask indicating which worlds to reset.
Shape ``(num_worlds,)``, dtype ``wp.int32``. If None, resets all worlds.
"""
cls._solver.reset(
state_out=cls._state_0,
joint_q=cls._state_0.joint_q,
joint_u=cls._state_0.joint_qd,
world_mask=world_mask,
)
[docs]
@classmethod
def step(cls) -> None:
"""Step the physics simulation."""
sim = PhysicsManager._sim
if sim is None or not sim.is_playing():
return
# Kamino: run solver.reset() with the accumulated world mask to reinitialise
# internal state (warm-start containers, constraint multipliers) for reset worlds.
# Note: runs every step. solver.reset() with an all-False world_mask is a no-op
# (kernels check mask per-world and skip). The cost of a no-op launch is negligible
# compared to the complexity of maintaining a separate boolean guard.
cls._forward_kamino(world_mask=cls._world_reset_mask)
# Notify solver of model changes
if cls._model_changes:
with wp.ScopedDevice(PhysicsManager._device):
for change in cls._model_changes:
cls._solver.notify_model_changed(change)
NewtonManager._model_changes = set()
# Lazy CUDA graph capture: deferred from initialize_solver() when RTX was active.
# By the time step() is first called, RTX has fully initialized (all cudaImportExternalMemory
# calls are done) and is idle between render frames — giving us a clean capture window.
cfg = PhysicsManager._cfg
device = PhysicsManager._device
if cls._graph_capture_pending and cfg is not None and cfg.use_cuda_graph and "cuda" in device: # type: ignore[union-attr]
NewtonManager._graph_capture_pending = False
NewtonManager._graph = cls._capture_relaxed_graph(device)
if cls._graph is not None:
# Kamino: StateKamino.from_newton() lazily allocates body_f_total,
# joint_q_prev, and joint_lambdas via wp.clone/wp.zeros during the
# first step() inside graph capture. Replay once to pin those
# memory-pool addresses before any eager solver.reset() call.
wp.capture_launch(cls._graph)
logger.info("Newton CUDA graph captured (deferred relaxed mode, RTX-compatible)")
else:
logger.warning("Newton deferred CUDA graph capture failed; using eager execution")
# Ensure body_q is up-to-date before collision detection.
# After env resets, joint_q is written but body_q (used by
# broadphase/narrowphase) is stale until FK runs.
# Only runs FK for dirtied articulations via the accumulated mask.
if cls._needs_collision_pipeline:
eval_fk(cls._model, cls._state_0.joint_q, cls._state_0.joint_qd, cls._state_0, cls._fk_reset_mask)
# Zero both masks after consumption
NewtonManager._world_reset_mask.zero_()
NewtonManager._fk_reset_mask.zero_()
# Step simulation (graphed or not; _graph is None when capture is disabled or failed)
if cfg is not None and cfg.use_cuda_graph and cls._graph is not None and "cuda" in device: # type: ignore[union-attr]
wp.capture_launch(cls._graph)
else:
with wp.ScopedDevice(device):
cls._simulate_physics_only()
if cls._usdrt_stage is not None:
cls._mark_transforms_dirty()
# Launch solver-specific debug logging after stepping.
cls._log_solver_debug()
PhysicsManager._sim_time += cls._solver_dt * cls._num_substeps
@classmethod
def _build_solver(cls, model: Model, solver_cfg: KaminoSolverCfg) -> None:
"""Construct :class:`SolverKamino` and populate the base-class slots.
Sets :attr:`NewtonManager._needs_collision_pipeline` to ``True`` only
when ``use_collision_detector=False`` (Kamino's internal detector
handles contacts otherwise).
"""
NewtonManager._solver = SolverKamino(model, solver_cfg.to_solver_config())
NewtonManager._use_single_state = False
NewtonManager._needs_collision_pipeline = not solver_cfg.use_collision_detector
@classmethod
def _capture_or_defer_cuda_graph(cls) -> None:
"""Capture the physics CUDA graph, or defer if RTX is initializing."""
cfg = PhysicsManager._cfg
device = PhysicsManager._device
use_cuda_graph = cfg is not None and cfg.use_cuda_graph and "cuda" in device # type: ignore[union-attr]
with Timer(name="newton_cuda_graph", msg="CUDA graph took:"):
if not use_cuda_graph:
NewtonManager._graph = None
return
if cls._usdrt_stage is None:
# No RTX active — use standard Warp capture (cudaStreamCaptureModeGlobal).
with wp.ScopedCapture() as capture:
cls._simulate_physics_only()
NewtonManager._graph = capture.graph
logger.info("Newton CUDA graph captured (standard Warp mode)")
# TODO: streamline this with base NewtonManager
# Kamino: StateKamino.from_newton() lazily allocates body_f_total,
# joint_q_prev, and joint_lambdas via wp.clone/wp.zeros during the
# first step() inside graph capture. Replay once to pin those
# memory-pool addresses before any eager solver.reset() call.
wp.capture_launch(cls._graph)
else:
# RTX is active during initialization — cudaImportExternalMemory and other
# non-capturable RTX ops run on background CUDA streams right now.
# Defer capture to the first step() call, after RTX is fully initialized
# and idle between render frames (clean capture window).
NewtonManager._graph = None
NewtonManager._graph_capture_pending = True
logger.info("Newton CUDA graph capture deferred until first step() (RTX active)")