Source code for isaaclab_contrib.deformable.coupled_mjwarp_vbd_manager

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Coupled MJWarp + VBD Newton manager."""

from __future__ import annotations

import inspect
import logging
from typing import TYPE_CHECKING

import warp as wp
from isaaclab_newton.physics.newton_manager import NewtonManager
from newton import Contacts, Control, Model, ModelBuilder, State
from newton._src.usd.schemas import SchemaResolverNewton, SchemaResolverPhysx
from newton.solvers import SolverBase, SolverMuJoCo, SolverVBD

from isaaclab.sim.utils.stage import get_current_stage

from .deformable_object import (
    add_deformable_entry_to_builder,
    clear_deformable_builder_hooks,
    install_deformable_builder_hooks,
)
from .kernels import _kernel_body_particle_reaction
from .newton_manager_cfg import CoupledMJWarpVBDSolverCfg

if TYPE_CHECKING:
    from isaaclab.sim.simulation_context import SimulationContext

logger = logging.getLogger(__name__)


[docs] class NewtonCoupledMJWarpVBDManager(NewtonManager): """:class:`NewtonManager` specialization for the coupled MJWarp + VBD solver. Due to Newton deformables not being properly integrated yet, this manager uses the same temporary solutions from VBD Manager. Always uses Newton's :class:`CollisionPipeline` for contact handling. """ _rigid_solver: SolverMuJoCo _soft_solver: SolverVBD _coupling_mode: str | None = None
[docs] @classmethod def initialize(cls, sim_context: SimulationContext) -> None: """Initialize the manager with simulation context. Args: sim_context: Parent simulation context. TODO: Subclass should not override this method, once deformables supported on Newton import_usd, this can be unified with NewtonManager's implementation. """ # Deformable body registry and extension hooks. # Experimental deformable support registers callbacks here so the manager # and cloner can invoke them without hard-coding deformable logic. install_deformable_builder_hooks() super().initialize(sim_context)
[docs] @classmethod def step(cls) -> None: """Step the physics simulation.""" from isaaclab.physics import PhysicsManager sim = PhysicsManager._sim if sim is None or not sim.is_playing(): return # Notify solver of model changes if cls._model_changes: with wp.ScopedDevice(PhysicsManager._device): for change in cls._model_changes: cls._rigid_solver.notify_model_changed(change) cls._soft_solver.notify_model_changed(change) NewtonManager._model_changes = set() super().step()
@classmethod def _solver_specific_clear(cls): """Clear VBD-specific state.""" clear_deformable_builder_hooks() @classmethod def _get_deformable_ignore_paths(cls) -> list[str]: """Return USD prim paths to skip when calling ``builder.add_usd``. For each registered deformable body, both the simulation mesh (which carries ``UsdPhysics.CollisionAPI``) and the visual mesh are returned. The sim mesh must be skipped so Newton does not create a redundant static mesh collider alongside the particles produced by ``add_soft_mesh``. The visual mesh is skipped so Newton does not treat it as a collider — Kit reads it directly from USD for rendering. Paths may contain regex patterns; Newton's ``add_usd`` matches them via :func:`re.match`. """ paths: list[str] = [] for entry in cls._deformable_registry: paths.append(entry.sim_mesh_prim_path) paths.append(entry.vis_mesh_prim_path) return paths
[docs] @classmethod def start_simulation(cls) -> None: """Start simulation by finalizing model and initializing state. This function finalizes the model and initializes the simulation state. Note: Collision pipeline is initialized later in initialize_solver() after we determine whether the solver needs external collision detection. TODO: Subclass should not override this method, missing piece is having Newton bind a surface mesh to volume deformable tetrahedral mesh in addition to removing the deformable_registry data structure. """ super().start_simulation() # Apply global model parameters from :class:`NewtonModelCfg` to the finalized model. # Sets ``soft_contact_ke/kd/mu`` and optionally overrides per-shape # ``shape_material_ke/kd/mu`` on the Newton model. from isaaclab.physics import PhysicsManager cfg = PhysicsManager._cfg if cfg is not None and hasattr(cfg, "model_cfg") and cfg.model_cfg is not None: model = cls._model if model is None: return model_cfg = cfg.model_cfg model.soft_contact_ke = float(model_cfg.soft_contact_ke) model.soft_contact_kd = float(model_cfg.soft_contact_kd) model.soft_contact_mu = float(model_cfg.soft_contact_mu) if model_cfg.shape_material_ke is not None: model.shape_material_ke.fill_(float(model_cfg.shape_material_ke)) if model_cfg.shape_material_kd is not None: model.shape_material_kd.fill_(float(model_cfg.shape_material_kd)) if model_cfg.shape_material_mu is not None: model.shape_material_mu.fill_(float(model_cfg.shape_material_mu)) # Setup USD/Fabric sync for Kit viewport deformable rendering if not cls._clone_physics_only and cls._deformable_registry: import re import usdrt if NewtonManager._usdrt_stage is None: NewtonManager._usdrt_stage = get_current_stage(fabric=True) stage = get_current_stage() for entry in cls._deformable_registry: for inst_idx, offset in enumerate(entry.particle_offsets): # Resolve regex pattern to concrete instance path of visual mesh resolved_vis = re.sub(r"(?<=[Ee]nv_)\.\*", str(inst_idx), entry.vis_mesh_prim_path) resolved_vis = re.sub(r"\.\*", str(inst_idx), resolved_vis) vis_prim = stage.GetPrimAtPath(resolved_vis) if not vis_prim or not vis_prim.IsValid(): logger.warning("[setup_fabric_particle_sync] vis prim not found at %s", resolved_vis) continue # Create per-instance particle offset and count attributes on the visual mesh # prim so the Fabric sync kernel can find the right slice of particle_q # and iterate only over this body's particles (counts vary across bodies). fab_prim = NewtonManager._usdrt_stage.GetPrimAtPath(vis_prim.GetPath().pathString) fab_prim.CreateAttribute( NewtonManager._newton_particle_offset_attr, usdrt.Sdf.ValueTypeNames.UInt, True ) fab_prim.GetAttribute(NewtonManager._newton_particle_offset_attr).Set(offset) fab_prim.CreateAttribute( NewtonManager._newton_particle_count_attr, usdrt.Sdf.ValueTypeNames.UInt, True ) fab_prim.GetAttribute(NewtonManager._newton_particle_count_attr).Set(entry.particles_per_body) cls._mark_particles_dirty() cls.sync_particles_to_usd()
[docs] @classmethod def instantiate_builder_from_stage(cls): """Create builder from USD stage with special treatment for deformable bodies, as these are not read from USD yet. Detects env Xforms (e.g. ``/World/Env_0``, ``/World/Env_1``) and builds each as a separate Newton world via ``begin_world``/``end_world``. Falls back to a flat ``add_usd`` when no env Xforms are found. TODO: Subclass should not override this method, once deformables supported on Newton import_usd, this can be unified with NewtonManager's implementation. """ import re from pxr import UsdGeom stage = get_current_stage() up_axis = UsdGeom.GetStageUpAxis(stage) # Scan /World children for env-like Xforms (Env_0, env_1, ...) env_pattern = re.compile(r"^[Ee]nv_(\d+)$") world_prim = stage.GetPrimAtPath("/World") env_paths: list[tuple[int, str]] = [] if world_prim and world_prim.IsValid(): for child in world_prim.GetChildren(): m = env_pattern.match(child.GetName()) if m: env_paths.append((int(m.group(1)), child.GetPath().pathString)) env_paths.sort(key=lambda x: x[0]) builder = ModelBuilder(up_axis=up_axis) schema_resolvers = [SchemaResolverNewton(), SchemaResolverPhysx()] # Deformable sim/visual mesh paths must be skipped by ``add_usd`` # so they don't get duplicated as static colliders. deformable_ignore_paths = cls._get_deformable_ignore_paths() if not env_paths: # No env Xforms — flat loading builder.add_usd(stage, ignore_paths=deformable_ignore_paths, schema_resolvers=schema_resolvers) # Add deformable bodies from the registry (single world at origin). for entry in cls._deformable_registry: add_deformable_entry_to_builder(builder, entry, 0, [0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0]) else: # Load everything except the env subtrees (ground plane, lights, etc.) ignore_paths = [path for _, path in env_paths] + deformable_ignore_paths builder.add_usd(stage, ignore_paths=ignore_paths, schema_resolvers=schema_resolvers) # Build a prototype from the first env (all envs assumed identical) _, proto_path = env_paths[0] proto = ModelBuilder(up_axis=up_axis) proto.add_usd( stage, root_path=proto_path, ignore_paths=deformable_ignore_paths, schema_resolvers=schema_resolvers, ) # Inject registered sites into the proto before replication global_sites, proto_sites = cls._cl_inject_sites(builder, {proto_path: proto}) global_site_map: dict[str, tuple[int, None]] = {label: (idx, None) for label, idx in global_sites.items()} num_worlds = len(env_paths) local_site_map: dict[str, list[list[int]]] = {} site_entries = proto_sites.get(id(proto), {}) # Add each env as a separate Newton world xform_cache = UsdGeom.XformCache() for col, (_, env_path) in enumerate(env_paths): builder.begin_world() offset = builder.shape_count world_xform = xform_cache.GetLocalToWorldTransform(stage.GetPrimAtPath(env_path)) translation = world_xform.ExtractTranslation() rotation = world_xform.ExtractRotationQuat() pos = (translation[0], translation[1], translation[2]) quat = ( rotation.GetImaginary()[0], rotation.GetImaginary()[1], rotation.GetImaginary()[2], rotation.GetReal(), ) builder.add_builder(proto, xform=wp.transform(pos, quat)) for label, proto_shape_indices in site_entries.items(): if label not in local_site_map: local_site_map[label] = [[] for _ in range(num_worlds)] for proto_shape_idx in proto_shape_indices: local_site_map[label][col].append(offset + proto_shape_idx) # Add deformable bodies from the registry into this world. for entry in cls._deformable_registry: add_deformable_entry_to_builder(builder, entry, col, list(pos), quat) builder.end_world() NewtonManager._cl_site_index_map = { **global_site_map, **{label: (None, per_world) for label, per_world in local_site_map.items()}, } NewtonManager._num_envs = len(env_paths) # Call builder.color() if any deformable entries were added (required by VBD solver) if cls._deformable_registry: builder.color() cls.set_builder(builder)
@classmethod def _build_solver(cls, model: Model, solver_cfg: CoupledMJWarpVBDSolverCfg) -> None: """Construct a custom coupling between two solvers and populate the base-class slots. VBD always uses Newton's :class:`CollisionPipeline` and steps with separate input/output states, so the flags are fixed. """ cls._coupling_mode = solver_cfg.coupling_mode valid = set(inspect.signature(SolverMuJoCo.__init__).parameters) - {"self", "model"} kwargs = {k: v for k, v in solver_cfg.rigid_solver_cfg.to_dict().items() if k in valid} cls._rigid_solver = SolverMuJoCo(model, **kwargs) valid = set(inspect.signature(SolverVBD.__init__).parameters) - {"self", "model"} kwargs = {k: v for k, v in solver_cfg.soft_solver_cfg.to_dict().items() if k in valid} cls._soft_solver = SolverVBD(model, **kwargs) # Dummy solver for the newtonmanager NewtonManager._solver = SolverBase(model) NewtonManager._use_single_state = False NewtonManager._needs_collision_pipeline = True @classmethod def _step_solver( cls, state_in: State, state_out: State, control: Control, contacts: Contacts | None, substep_dt: float ) -> None: """One coupled substep. Args: state_in: Current state (read/write). state_out: Next state (write). control: Joint-level control inputs. contacts: Ignored -- the solver uses its own internal contacts. dt: Substep timestep [s]. """ if cls._coupling_mode == "one_way": cls._step_one_way(state_in, state_out, control, substep_dt) else: cls._step_two_way(state_in, state_out, control, substep_dt) @classmethod def _simulate_physics_only(cls) -> None: # Rebuild BVH once per step for solvers that require it (e.g. VBD cloth). if hasattr(cls._soft_solver, "rebuild_bvh"): cls._soft_solver.rebuild_bvh(cls._state_0) super()._simulate_physics_only() @classmethod def _step_one_way(cls, state_in: State, state_out: State, control: Control, dt: float) -> None: """One-way coupling: collide, then rigid step, then VBD.""" # 1. Clear forces state_in.clear_forces() state_out.clear_forces() # 2. Collision detection (cloth-body contacts) cls._collision_pipeline.collide(state_in, cls._contacts) # 3. Rigid-body step (does not read soft-contact reactions) cls._rigid_step(state_in, state_out, control, dt) # 4. Clear spurious particle forces from rigid step state_in.particle_f.zero_() # 5. VBD step -- particles only, reads updated rigid poses cls._soft_solver.step(state_in, state_out, control, cls._contacts, dt) @classmethod def _step_two_way(cls, state_in: State, state_out: State, control: Control, dt: float) -> None: """Two-way coupling: collide, inject reactions into body_f, rigid step, VBD step.""" # 1. Clear forces state_in.clear_forces() state_out.clear_forces() # 2. Collision detection BEFORE rigid step cls._collision_pipeline.collide(state_in, cls._contacts) # 3. Inject contact reaction forces into body_f. # state_out holds the previous substep's body_q (states swap each # substep), used for finite-difference body velocity in friction. # particle_q_prev is reconstructed from particle_qd inside the # kernel because VBD mutates particle_q in place, so the swapped # state's particle_q is not a clean prior-substep snapshot. if state_in.body_f is not None: cls._apply_reactions(state_in, state_out, dt) # 4. Rigid-body step (reads body_f for soft-contact reactions) cls._rigid_step(state_in, state_out, control, dt) # 5. Clear spurious particle forces from rigid step state_in.particle_f.zero_() # 6. VBD step -- uses same contacts detected in step 2 cls._soft_solver.step(state_in, state_out, control, cls._contacts, dt) @classmethod def _rigid_step(cls, state_in: State, state_out: State, control: Control, dt: float) -> None: """Advance rigid bodies with the configured sub-solver.""" model = cls._model # set particle_count = 0 to disable particle simulation in robot solver saved_particle_count = model.particle_count model.particle_count = 0 cls._rigid_solver.step(state_in, state_out, control, None, dt) # restore original settings model.particle_count = saved_particle_count @classmethod def _apply_reactions(cls, state: State, state_prev: State, dt: float) -> None: """Launch the reaction kernel to inject normal + friction forces into body_f. Args: state: Current state with particle positions/velocities and body state. state_prev: Previous substep state whose ``body_q`` provides the reference poses for finite-difference body velocity. dt: Substep timestep [s]. """ model = cls._model contacts = cls._contacts if contacts is None: return contact_capacity = int(contacts.soft_contact_particle.shape[0]) if contact_capacity == 0: return # The kernel reconstructs particle_q_prev from particle_qd internally: # state_prev.particle_q is unreliable because VBD mutates particle_q # in place during its iteration, so the swapped state's particle_q is # not a clean snapshot of the prior substep. wp.launch( _kernel_body_particle_reaction, dim=contact_capacity, inputs=[ contacts.soft_contact_count, contacts.soft_contact_particle, contacts.soft_contact_shape, contacts.soft_contact_body_pos, contacts.soft_contact_body_vel, contacts.soft_contact_normal, state.particle_q, state.particle_qd, model.particle_radius, state.body_q, state_prev.body_q, state.body_qd, model.body_com, model.shape_body, model.shape_material_mu, float(model.soft_contact_ke), float(model.soft_contact_kd), float(model.soft_contact_mu), float(cls._soft_solver.friction_epsilon), float(dt), state.body_f, ], )