Source code for isaaclab_physx.assets.surface_gripper.surface_gripper

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import logging
import warnings
from typing import TYPE_CHECKING

import numpy as np
import torch
import warp as wp

from isaacsim.core.utils.extensions import enable_extension

import isaaclab.sim as sim_utils
from isaaclab.assets import AssetBase
from isaaclab.utils.version import get_isaac_sim_version, has_kit

if TYPE_CHECKING:
    from isaacsim.robot.surface_gripper import GripperView

    from .surface_gripper_cfg import SurfaceGripperCfg

# import logger
logger = logging.getLogger(__name__)

# -- Warp kernels --


@wp.kernel
def write_scalar_at_indices(
    data: wp.array(dtype=wp.float32),
    env_ids: wp.array(dtype=wp.int32),
    from_mask: bool,
    out: wp.array(dtype=wp.float32),
):
    i = wp.tid()
    if from_mask:
        out[env_ids[i]] = data[env_ids[i]]
    else:
        out[env_ids[i]] = data[i]


@wp.kernel
def fill_scalar_at_indices(
    value: wp.float32,
    env_ids: wp.array(dtype=wp.int32),
    out: wp.array(dtype=wp.float32),
):
    i = wp.tid()
    out[env_ids[i]] = value


[docs] class SurfaceGripper(AssetBase): """A surface gripper actuator class. Surface grippers are actuators capable of grasping objects when in close proximity with them. Each surface gripper in the collection must be a `Isaac Sim SurfaceGripper` primitive. On playing the simulation, the physics engine will automatically register the surface grippers into a SurfaceGripperView object. This object can be accessed using the :attr:`gripper_view` attribute. To interact with the surface grippers, the user can use the :attr:`state` to get the current state of the grippers, :attr:`command` to get the current command sent to the grippers, and :func:`update_gripper_properties` to update the properties of the grippers at runtime. Finally, the :func:`set_grippers_command` function should be used to set the desired command for the grippers. .. note:: The :func:`set_grippers_command` function does not write to the simulation. The simulation automatically calls :func:`write_data_to_sim` function to write the command to the simulation. Similarly, the update function is called automatically for every simulation step, and does not need to be called by the user. .. note:: The SurfaceGripper is only supported on CPU for now. Please set the simulation backend to run on CPU. Use `--device cpu` to run the simulation on CPU. """
[docs] def __init__(self, cfg: SurfaceGripperCfg): """Initialize the surface gripper. Args: cfg: A configuration instance. """ # copy the configuration self._cfg = cfg.copy() # checks for Isaac Sim v5.0 to ensure that the surface gripper is supported if has_kit() and get_isaac_sim_version().major < 5: raise NotImplementedError( "SurfaceGrippers are only supported by IsaacSim 5.0 and newer. Current version is" f" '{get_isaac_sim_version()}'. Please update to IsaacSim 5.0 or newer to use this feature." ) # flag for whether the sensor is initialized self._is_initialized = False self._debug_vis_handle = None # register various callback functions self._register_callbacks()
""" Properties """ @property def data(self): raise NotImplementedError("SurfaceGripper does have a data interface.") @property def num_instances(self) -> int: """Number of instances of the gripper. This is equal to the total number of grippers (the view can only contain one gripper per environment). """ return self._num_envs @property def state(self) -> wp.array: """Returns the gripper state buffer. The gripper state is a list of integers: - -1 --> Open - 0 --> Closing - 1 --> Closed """ return self._gripper_state @property def command(self) -> wp.array: """Returns the gripper command buffer. The gripper command is a list of floats: - [-1, -0.3] --> Open - [-0.3, 0.3] --> Do nothing - [0.3, 1] --> Close """ return self._gripper_command @property def gripper_view(self) -> GripperView: """Returns the gripper view object.""" return self._gripper_view """ Operations - _index / _mask API """
[docs] def set_grippers_command_index( self, states: torch.Tensor | wp.array, env_ids: wp.array | None = None, full_data: bool = False ) -> None: """Set the internal gripper command buffer. This function does not write to the simulation. Possible values for the gripper command are: - [-1, -0.3] --> Open - ]-0.3, 0.3[ --> Do nothing - [0.3, 1] --> Close Args: states: A tensor/array of floats representing the gripper command. env_ids: Environment indices. Defaults to None (all environments). full_data: Whether ``states`` is indexed by ``env_ids`` (True) or is compact (False). """ if env_ids is None: env_ids = self._ALL_INDICES if full_data: self.assert_shape_and_dtype(states, (self.num_instances,), wp.float32, "states") else: self.assert_shape_and_dtype(states, (env_ids.shape[0],), wp.float32, "states") # Convert torch input to warp if isinstance(states, torch.Tensor): states = wp.from_torch(states.to(torch.float32).contiguous(), dtype=wp.float32) wp.launch( write_scalar_at_indices, dim=env_ids.shape[0], inputs=[states, env_ids, full_data], outputs=[self._gripper_command], device=self._device, )
[docs] def set_grippers_command_mask(self, states: torch.Tensor | wp.array, env_mask: torch.Tensor | None = None) -> None: """Set the internal gripper command buffer using a boolean mask. Args: states: A tensor/array of floats representing the gripper command (full size). env_mask: Boolean mask of shape (num_envs,). Defaults to None (all environments). """ if env_mask is not None: env_ids = wp.from_torch(torch.argwhere(env_mask).view(-1).to(torch.int32), dtype=wp.int32) else: env_ids = self._ALL_INDICES self.set_grippers_command_index(states, env_ids, full_data=True)
[docs] def set_grippers_command(self, states: torch.Tensor, indices: torch.Tensor | None = None) -> None: """Set the internal gripper command buffer. This function does not write to the simulation. .. deprecated:: v2.0.0 Use :meth:`set_grippers_command_index` instead. Args: states: A tensor of integers representing the gripper command. indices: A tensor of integers representing the indices. Defaults to None. """ env_ids = self._resolve_env_ids(indices) self.set_grippers_command_index(states, env_ids)
[docs] def update_gripper_properties_index( self, max_grip_distance: wp.array | None = None, coaxial_force_limit: wp.array | None = None, shear_force_limit: wp.array | None = None, retry_interval: wp.array | None = None, env_ids: wp.array | None = None, full_data: bool = False, ) -> None: """Update the gripper properties. Args: max_grip_distance: The maximum grip distance. Shape (num_envs,) or (len(env_ids),). coaxial_force_limit: The coaxial force limit. Shape (num_envs,) or (len(env_ids),). shear_force_limit: The shear force limit. Shape (num_envs,) or (len(env_ids),). retry_interval: The retry interval. Shape (num_envs,) or (len(env_ids),). env_ids: Environment indices. Defaults to None (all environments). full_data: Whether input arrays are indexed by ``env_ids`` (True) or compact (False). """ if env_ids is None: env_ids = self._ALL_INDICES for prop_name, prop_data, prop_buf in [ ("max_grip_distance", max_grip_distance, self._max_grip_distance), ("coaxial_force_limit", coaxial_force_limit, self._coaxial_force_limit), ("shear_force_limit", shear_force_limit, self._shear_force_limit), ("retry_interval", retry_interval, self._retry_interval), ]: if prop_data is not None: if full_data: self.assert_shape_and_dtype(prop_data, (self.num_instances,), wp.float32, prop_name) else: self.assert_shape_and_dtype(prop_data, (env_ids.shape[0],), wp.float32, prop_name) wp.launch( write_scalar_at_indices, dim=env_ids.shape[0], inputs=[prop_data, env_ids, full_data], outputs=[prop_buf], device=self._device, ) # Convert to list for the GripperView API indices_list = wp.to_torch(env_ids).tolist() self._gripper_view.set_surface_gripper_properties( max_grip_distance=wp.to_torch(self._max_grip_distance).tolist(), coaxial_force_limit=wp.to_torch(self._coaxial_force_limit).tolist(), shear_force_limit=wp.to_torch(self._shear_force_limit).tolist(), retry_interval=wp.to_torch(self._retry_interval).tolist(), indices=indices_list, )
[docs] def update_gripper_properties_mask( self, max_grip_distance: wp.array | None = None, coaxial_force_limit: wp.array | None = None, shear_force_limit: wp.array | None = None, retry_interval: wp.array | None = None, env_mask: torch.Tensor | None = None, ) -> None: """Update the gripper properties using a boolean mask. Args: max_grip_distance: The maximum grip distance. Shape (num_envs,). coaxial_force_limit: The coaxial force limit. Shape (num_envs,). shear_force_limit: The shear force limit. Shape (num_envs,). retry_interval: The retry interval. Shape (num_envs,). env_mask: Boolean mask of shape (num_envs,). Defaults to None (all environments). """ if env_mask is not None: env_ids = wp.from_torch(torch.argwhere(env_mask).view(-1).to(torch.int32), dtype=wp.int32) else: env_ids = self._ALL_INDICES self.update_gripper_properties_index( max_grip_distance=max_grip_distance, coaxial_force_limit=coaxial_force_limit, shear_force_limit=shear_force_limit, retry_interval=retry_interval, env_ids=env_ids, full_data=True, )
[docs] def update_gripper_properties( self, max_grip_distance: torch.Tensor | None = None, coaxial_force_limit: torch.Tensor | None = None, shear_force_limit: torch.Tensor | None = None, retry_interval: torch.Tensor | None = None, indices: torch.Tensor | None = None, ) -> None: """Update the gripper properties. .. deprecated:: v2.0.0 Use :meth:`update_gripper_properties_index` instead. Args: max_grip_distance: The maximum grip distance. Shape (num_envs,). coaxial_force_limit: The coaxial force limit. Shape (num_envs,). shear_force_limit: The shear force limit. Shape (num_envs,). retry_interval: The retry interval. Shape (num_envs,). indices: The indices of the grippers to update. Defaults to None. """ env_ids = self._resolve_env_ids(indices) # Convert torch inputs to warp def _to_wp(t): if t is None: return None if isinstance(t, torch.Tensor): return wp.from_torch(t.to(torch.float32).contiguous(), dtype=wp.float32) return t self.update_gripper_properties_index( max_grip_distance=_to_wp(max_grip_distance), coaxial_force_limit=_to_wp(coaxial_force_limit), shear_force_limit=_to_wp(shear_force_limit), retry_interval=_to_wp(retry_interval), env_ids=env_ids, )
[docs] def update(self, dt: float) -> None: """Update the gripper state using the SurfaceGripperView. This function is called every simulation step. The data fetched from the gripper view is a list of strings containing 3 possible states: - "Open" --> 0 - "Closing" --> 1 - "Closed" --> 2 To make this more neural network friendly, we convert the list of strings to a list of floats: - "Open" --> -1.0 - "Closing" --> 0.0 - "Closed" --> 1.0 .. note:: We need to do this conversion for every single step of the simulation because the gripper can lose contact with the object if some conditions are met: such as if a large force is applied to the gripped object. """ state_list: list[int] = self._gripper_view.get_surface_gripper_status() self._gripper_state = wp.array([float(s) - 1.0 for s in state_list], dtype=wp.float32, device=self._device)
[docs] def write_data_to_sim(self) -> None: """Write the gripper command to the SurfaceGripperView. The gripper command is a list of integers that needs to be converted to a list of strings: - [-1, -0.3] --> Open - ]-0.3, 0.3[ --> Do nothing - [0.3, 1] --> Closed The Do nothing command is not applied, and is only used to indicate whether the gripper state has changed. """ # Convert to torch at the GripperView boundary (zero-copy on CPU) command_torch = wp.to_torch(self._gripper_command) # Remove the SurfaceGripper indices that have a commanded value of 2 indices = torch.argwhere(torch.logical_or(command_torch < -0.3, command_torch > 0.3)).to(torch.int32).tolist() # Write to the SurfaceGripperView if there are any indices to write to if len(indices) > 0: self._gripper_view.apply_gripper_action(command_torch.tolist(), indices)
[docs] def reset_index(self, env_ids: wp.array | None = None) -> None: """Reset the gripper command buffer. Args: env_ids: Environment indices. Defaults to None (all environments). """ if env_ids is None: env_ids = self._ALL_INDICES # Reset the selected grippers to an open status wp.launch( fill_scalar_at_indices, dim=env_ids.shape[0], inputs=[wp.float32(-1.0), env_ids], outputs=[self._gripper_command], device=self._device, ) self.write_data_to_sim() # Sets the gripper last command to be 0.0 (do nothing) wp.launch( fill_scalar_at_indices, dim=env_ids.shape[0], inputs=[wp.float32(0.0), env_ids], outputs=[self._gripper_command], device=self._device, ) # Force set the state to open. It will read open in the next update call. wp.launch( fill_scalar_at_indices, dim=env_ids.shape[0], inputs=[wp.float32(-1.0), env_ids], outputs=[self._gripper_state], device=self._device, )
[docs] def reset_mask(self, env_mask: torch.Tensor | None = None) -> None: """Reset the gripper command buffer using a boolean mask. Args: env_mask: Boolean mask of shape (num_envs,). Defaults to None (all environments). """ if env_mask is not None: env_ids = wp.from_torch(torch.argwhere(env_mask).view(-1).to(torch.int32), dtype=wp.int32) else: env_ids = self._ALL_INDICES self.reset_index(env_ids)
[docs] def reset(self, indices: torch.Tensor | None = None) -> None: """Reset the gripper command buffer. .. deprecated:: v2.0.0 Use :meth:`reset_index` instead. Args: indices: A tensor of integers representing the indices of the grippers to reset. Defaults to None. """ env_ids = self._resolve_env_ids(indices) self.reset_index(env_ids)
""" Initialization. """ def _initialize_impl(self) -> None: """Initializes the gripper-related handles and internal buffers. Raises: ValueError: If the simulation backend is not CPU. RuntimeError: If the Simulation Context is not initialized or if gripper prims are not found. .. note:: The SurfaceGripper is only supported on CPU for now. Please set the simulation backend to run on CPU. Use `--device cpu` to run the simulation on CPU. """ enable_extension("isaacsim.robot.surface_gripper") from isaacsim.robot.surface_gripper import GripperView # Check that we are using the CPU backend. if self._device != "cpu": raise Exception( "SurfaceGripper is only supported on CPU for now. Please set the simulation backend to run on CPU. Use" " `--device cpu` to run the simulation on CPU." ) # obtain the first prim in the regex expression (all others are assumed to be a copy of this) template_prim = sim_utils.find_first_matching_prim(self._cfg.prim_path) if template_prim is None: raise RuntimeError(f"Failed to find prim for expression: '{self._cfg.prim_path}'.") template_prim_path = template_prim.GetPath().pathString # find surface gripper prims gripper_prims = sim_utils.get_all_matching_child_prims( template_prim_path, predicate=lambda prim: prim.GetTypeName() == "IsaacSurfaceGripper", traverse_instance_prims=False, ) if len(gripper_prims) == 0: raise RuntimeError( f"Failed to find a surface gripper when resolving '{self._cfg.prim_path}'." " Please ensure that the prim has type 'IsaacSurfaceGripper'." ) if len(gripper_prims) > 1: raise RuntimeError( f"Failed to find a single surface gripper when resolving '{self._cfg.prim_path}'." f" Found multiple '{gripper_prims}' under '{template_prim_path}'." " Please ensure that there is only one surface gripper in the prim path tree." ) # resolve gripper prim back into regex expression gripper_prim_path = gripper_prims[0].GetPath().pathString gripper_prim_path_expr = self._cfg.prim_path + gripper_prim_path[len(template_prim_path) :] # Count number of environments self._prim_expr = gripper_prim_path_expr env_prim_path_expr = self._prim_expr.rsplit("/", 1)[0] self._parent_prims = sim_utils.find_matching_prims(env_prim_path_expr) self._num_envs = len(self._parent_prims) # Create buffers self._create_buffers() # Process the configuration self._process_cfg() # Initialize gripper view and set properties. self._gripper_view = GripperView( self._prim_expr, ) self.update_gripper_properties_index( max_grip_distance=wp.clone(self._max_grip_distance), coaxial_force_limit=wp.clone(self._coaxial_force_limit), shear_force_limit=wp.clone(self._shear_force_limit), retry_interval=wp.clone(self._retry_interval), ) # log information about the surface gripper logger.info(f"Surface gripper initialized at: {self._cfg.prim_path} with root '{gripper_prim_path_expr}'.") logger.info(f"Number of instances: {self._num_envs}") # Reset grippers self.reset_index() def _create_buffers(self) -> None: """Create the buffers for storing the gripper state, command, and properties.""" self._gripper_state = wp.zeros(self._num_envs, dtype=wp.float32, device=self._device) self._gripper_command = wp.zeros(self._num_envs, dtype=wp.float32, device=self._device) self._ALL_INDICES = wp.array(np.arange(self._num_envs, dtype=np.int32), device=self._device) self._max_grip_distance = wp.zeros(self._num_envs, dtype=wp.float32, device=self._device) self._coaxial_force_limit = wp.zeros(self._num_envs, dtype=wp.float32, device=self._device) self._shear_force_limit = wp.zeros(self._num_envs, dtype=wp.float32, device=self._device) self._retry_interval = wp.zeros(self._num_envs, dtype=wp.float32, device=self._device) def _process_cfg(self) -> None: """Process the configuration for the gripper properties.""" # Get one of the grippers as defined in the default stage gripper_prim = self._parent_prims[0] try: max_grip_distance = gripper_prim.GetAttribute("isaac:maxGripDistance").Get() except Exception as e: warnings.warn( f"Failed to retrieve max_grip_distance from stage, defaulting to user provided cfg. Exception: {e}" ) max_grip_distance = None try: coaxial_force_limit = gripper_prim.GetAttribute("isaac:coaxialForceLimit").Get() except Exception as e: warnings.warn( f"Failed to retrieve coaxial_force_limit from stage, defaulting to user provided cfg. Exception: {e}" ) coaxial_force_limit = None try: shear_force_limit = gripper_prim.GetAttribute("isaac:shearForceLimit").Get() except Exception as e: warnings.warn( f"Failed to retrieve shear_force_limit from stage, defaulting to user provided cfg. Exception: {e}" ) shear_force_limit = None try: retry_interval = gripper_prim.GetAttribute("isaac:retryInterval").Get() except Exception as e: warnings.warn( f"Failed to retrieve retry_interval from stage defaulting to user provided cfg. Exception: {e}" ) retry_interval = None self._max_grip_distance = self.parse_gripper_parameter(self._cfg.max_grip_distance, max_grip_distance) self._coaxial_force_limit = self.parse_gripper_parameter(self._cfg.coaxial_force_limit, coaxial_force_limit) self._shear_force_limit = self.parse_gripper_parameter(self._cfg.shear_force_limit, shear_force_limit) self._retry_interval = self.parse_gripper_parameter(self._cfg.retry_interval, retry_interval) """ Helper functions. """ def _resolve_env_ids(self, env_ids) -> wp.array: """Resolve environment indices to a warp array. Args: env_ids: Environment indices. Can be None, a slice, a list, or a torch.Tensor. Returns: A warp array of int32 indices. """ if env_ids is None or env_ids == slice(None): return self._ALL_INDICES elif isinstance(env_ids, list): return wp.array(env_ids, dtype=wp.int32, device=self._device) elif isinstance(env_ids, torch.Tensor): return wp.from_torch(env_ids.to(torch.int32).contiguous(), dtype=wp.int32) return env_ids
[docs] def parse_gripper_parameter( self, cfg_value: float | int | tuple | None, default_value: float | int | tuple | None, ndim: int = 0 ) -> wp.array: """Parse the gripper parameter. Args: cfg_value: The value to parse. Can be a float, int, tuple, or None. default_value: The default value to use if cfg_value is None. Can be a float, int, tuple, or None. ndim: The number of dimensions of the parameter. Defaults to 0. Returns: A warp array of float32 values. """ # Adjust the buffer size based on the number of dimensions if ndim == 0: param = torch.zeros(self._num_envs, device=self._device) elif ndim == 3: param = torch.zeros(self._num_envs, 3, device=self._device) elif ndim == 4: param = torch.zeros(self._num_envs, 4, device=self._device) else: raise ValueError(f"Invalid number of dimensions: {ndim}") # Parse the parameter if cfg_value is not None: if isinstance(cfg_value, (float, int)): param[:] = float(cfg_value) elif isinstance(cfg_value, tuple): if len(cfg_value) == ndim: param[:] = torch.tensor(cfg_value, dtype=torch.float, device=self._device) else: raise ValueError(f"Invalid number of values for parameter. Got: {len(cfg_value)}\nExpected: {ndim}") else: raise TypeError(f"Invalid type for parameter value: {type(cfg_value)}. " + "Expected float or int.") elif default_value is not None: if isinstance(default_value, (float, int)): param[:] = float(default_value) elif isinstance(default_value, tuple): assert len(default_value) == ndim, f"Expected {ndim} values, got {len(default_value)}" param[:] = torch.tensor(default_value, dtype=torch.float, device=self._device) else: raise TypeError( f"Invalid type for default value: {type(default_value)}. " + "Expected float or Tensor." ) else: raise ValueError("The parameter value is None and no default value is provided.") # Convert to warp return wp.from_torch(param, dtype=wp.float32)