Source code for isaaclab_newton.assets.mpm_object.mpm_object_data

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import warp as wp

from isaaclab.assets.deformable_object.base_deformable_object_data import BaseDeformableObjectData
from isaaclab.utils.buffers import TimestampedBufferWarp as TimestampedBuffer
from isaaclab.utils.warp import ProxyArray

from isaaclab_newton.physics import NewtonManager as SimulationManager

from .kernels import compute_mean_vec3f_over_particles, compute_particle_state_w, gather_particles_vec3f, vec6f


[docs] class MPMObjectData(BaseDeformableObjectData): """Data container for a Newton MPM particle object.""" __backend_name__: str = "newton" def __init__(self, particle_offsets: wp.array, particles_per_object: int, num_instances: int, device: str): super().__init__(device) self._particle_offsets = particle_offsets self._particles_per_object = particles_per_object self._num_instances = num_instances self._particle_pos_w = TimestampedBuffer((num_instances, particles_per_object), device, wp.vec3f) self._particle_vel_w = TimestampedBuffer((num_instances, particles_per_object), device, wp.vec3f) self._particle_state_w = TimestampedBuffer((num_instances, particles_per_object), device, vec6f) self._root_pos_w = TimestampedBuffer((num_instances,), device, wp.vec3f) self._root_vel_w = TimestampedBuffer((num_instances,), device, wp.vec3f) self._particle_pos_w_ta = ProxyArray(self._particle_pos_w.data) self._particle_vel_w_ta = ProxyArray(self._particle_vel_w.data) self._particle_state_w_ta = ProxyArray(self._particle_state_w.data) self._root_pos_w_ta = ProxyArray(self._root_pos_w.data) self._root_vel_w_ta = ProxyArray(self._root_vel_w.data) self.default_nodal_state_w: ProxyArray | None = None self.default_particle_state_w: ProxyArray | None = None self.nodal_kinematic_target: ProxyArray | None = None self._create_simulation_bindings() def _create_simulation_bindings(self) -> None: """Validate current Newton particle arrays and invalidate gathered buffers.""" self._get_current_particle_state() self._particle_pos_w.timestamp = -1.0 self._particle_vel_w.timestamp = -1.0 self._particle_state_w.timestamp = -1.0 self._root_pos_w.timestamp = -1.0 self._root_vel_w.timestamp = -1.0 def _get_current_particle_state(self): state = SimulationManager.get_state_0() if state is None or state.particle_q is None or state.particle_qd is None: raise RuntimeError( "Failed to access Newton MPM particle state. Ensure the Newton model has been finalized and contains " "particle position and velocity arrays." ) return state @property def particle_pos_w(self) -> ProxyArray: """Particle positions in simulation world frame [m].""" if self._particle_pos_w.timestamp < self._sim_timestamp: state = self._get_current_particle_state() wp.launch( gather_particles_vec3f, dim=(self._num_instances, self._particles_per_object), inputs=[state.particle_q, self._particle_offsets], outputs=[self._particle_pos_w.data], device=self.device, ) self._particle_pos_w.timestamp = self._sim_timestamp return self._particle_pos_w_ta @property def particle_vel_w(self) -> ProxyArray: """Particle velocities in simulation world frame [m/s].""" if self._particle_vel_w.timestamp < self._sim_timestamp: state = self._get_current_particle_state() wp.launch( gather_particles_vec3f, dim=(self._num_instances, self._particles_per_object), inputs=[state.particle_qd, self._particle_offsets], outputs=[self._particle_vel_w.data], device=self.device, ) self._particle_vel_w.timestamp = self._sim_timestamp return self._particle_vel_w_ta @property def particle_state_w(self) -> ProxyArray: """Particle state ``[pos, vel]`` in simulation world frame [m, m/s].""" if self._particle_state_w.timestamp < self._sim_timestamp: wp.launch( compute_particle_state_w, dim=(self._num_instances, self._particles_per_object), inputs=[self.particle_pos_w.warp, self.particle_vel_w.warp], outputs=[self._particle_state_w.data], device=self.device, ) self._particle_state_w.timestamp = self._sim_timestamp return self._particle_state_w_ta @property def nodal_pos_w(self) -> ProxyArray: return self.particle_pos_w @property def nodal_vel_w(self) -> ProxyArray: return self.particle_vel_w @property def nodal_state_w(self) -> ProxyArray: return self.particle_state_w @property def root_pos_w(self) -> ProxyArray: """Mean particle position per instance in simulation world frame [m].""" if self._root_pos_w.timestamp < self._sim_timestamp: wp.launch( compute_mean_vec3f_over_particles, dim=(self._num_instances,), inputs=[self.particle_pos_w.warp, self._particles_per_object], outputs=[self._root_pos_w.data], device=self.device, ) self._root_pos_w.timestamp = self._sim_timestamp return self._root_pos_w_ta @property def root_vel_w(self) -> ProxyArray: """Mean particle velocity per instance in simulation world frame [m/s].""" if self._root_vel_w.timestamp < self._sim_timestamp: wp.launch( compute_mean_vec3f_over_particles, dim=(self._num_instances,), inputs=[self.particle_vel_w.warp, self._particles_per_object], outputs=[self._root_vel_w.data], device=self.device, ) self._root_vel_w.timestamp = self._sim_timestamp return self._root_vel_w_ta