Source code for isaaclab.sensors.sensor_base

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Base class for sensors.

This class defines an interface for sensors similar to how the :class:`isaaclab.assets.AssetBase` class works.
Each sensor class should inherit from this class and implement the abstract methods.
"""

from __future__ import annotations

import inspect
import re
import weakref
from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any

import warp as wp

import isaaclab.sim as sim_utils
from isaaclab.physics import PhysicsEvent, PhysicsManager
from isaaclab.utils.version import has_kit

from .kernels import reset_envs_kernel, update_outdated_envs_kernel, update_timestamp_kernel

if TYPE_CHECKING:
    from .sensor_base_cfg import SensorBaseCfg


[docs] class SensorBase(ABC): """The base class for implementing a sensor. The implementation is based on lazy evaluation. The sensor data is only updated when the user tries accessing the data through the :attr:`data` property or sets ``force_compute=True`` in the :meth:`update` method. This is done to avoid unnecessary computation when the sensor data is not used. The sensor is updated at the specified update period. If the update period is zero, then the sensor is updated at every simulation step. """
[docs] def __init__(self, cfg: SensorBaseCfg): """Initialize the sensor class. Args: cfg: The configuration parameters for the sensor. """ # check that the config is valid cfg.validate() # store inputs self.cfg = cfg.copy() # flag for whether the sensor is initialized self._is_initialized = False # flag for whether the sensor is in visualization mode self._is_visualizing = False self.stage = sim_utils.get_current_stage() # register various callback functions self._register_callbacks() # add handle for debug visualization (this is set to a valid handle inside set_debug_vis) self._debug_vis_handle = None # set initial state of debug visualization self.set_debug_vis(self.cfg.debug_vis)
def __del__(self): """Unsubscribe from the callbacks.""" # clear physics events handles self._clear_callbacks() """ Properties """ @property def is_initialized(self) -> bool: """Whether the sensor is initialized. Returns True if the sensor is initialized, False otherwise. """ return self._is_initialized @property def num_instances(self) -> int: """Number of instances of the sensor. This is equal to the number of sensors per environment multiplied by the number of environments. """ return self._num_envs @property def device(self) -> str: """Memory device for computation.""" return self._device @property @abstractmethod def data(self) -> Any: """Data from the sensor. This property is only updated when the user tries to access the data. This is done to avoid unnecessary computation when the sensor data is not used. For updating the sensor when this property is accessed, you can use the following code snippet in your sensor implementation: .. code-block:: python # update sensors if needed self._update_outdated_buffers() # return the data (where `_data` is the data for the sensor) return self._data """ raise NotImplementedError @property def has_debug_vis_implementation(self) -> bool: """Whether the sensor has a debug visualization implemented.""" # check if function raises NotImplementedError source_code = inspect.getsource(self._set_debug_vis_impl) return "NotImplementedError" not in source_code """ Operations """
[docs] def set_debug_vis(self, debug_vis: bool) -> bool: """Sets whether to visualize the sensor data. Args: debug_vis: Whether to visualize the sensor data. Returns: Whether the debug visualization was successfully set. False if the sensor does not support debug visualization. """ # check if debug visualization is supported if not self.has_debug_vis_implementation: return False # toggle debug visualization objects self._set_debug_vis_impl(debug_vis) # toggle debug visualization flag self._is_visualizing = debug_vis # toggle debug visualization handles if debug_vis: # create a subscriber for the post update event if it doesn't exist if self._debug_vis_handle is None: if has_kit(): import omni.kit.app # noqa: PLC0415 app_interface = omni.kit.app.get_app_interface() self._debug_vis_handle = app_interface.get_post_update_event_stream().create_subscription_to_pop( lambda event, obj=weakref.proxy(self): obj._debug_vis_callback(event) ) else: # remove the subscriber if it exists if self._debug_vis_handle is not None: self._debug_vis_handle.unsubscribe() self._debug_vis_handle = None # return success return True
[docs] def reset(self, env_ids: Sequence[int] | None = None, env_mask: wp.array | None = None) -> None: """Resets the sensor internals. Args: env_ids: The environment indices to reset. Defaults to None, in which case all environments are reset. env_mask: A boolean warp array indicating which environments to reset. If provided, takes priority over ``env_ids``. Defaults to None. """ env_mask = self._resolve_indices_and_mask(env_ids, env_mask) wp.launch( reset_envs_kernel, dim=self._num_envs, inputs=[env_mask, self._is_outdated, self._timestamp, self._timestamp_last_update], device=self._device, )
def update(self, dt: float, force_recompute: bool = False): # Skip update if sensor is not initialized if not self._is_initialized: return # Update the timestamp for the sensors wp.launch( update_timestamp_kernel, dim=self._num_envs, inputs=[ self._is_outdated, self._timestamp, self._timestamp_last_update, dt, self.cfg.update_period, ], device=self._device, ) # Update the buffers if force_recompute or self._is_visualizing: self._update_outdated_buffers() """ Implementation specific. """ @abstractmethod def _initialize_impl(self): """Initializes the sensor-related handles and internal buffers.""" # Obtain Simulation Context sim = sim_utils.SimulationContext.instance() if sim is None: raise RuntimeError("Simulation Context is not initialized!") # Obtain device and backend self._device = sim.device self._backend = sim.backend self._sim_physics_dt = sim.get_physics_dt() # Count number of environments env_prim_path_expr = self.cfg.prim_path.rsplit("/", 1)[0] self._parent_prims = sim_utils.find_matching_prims(env_prim_path_expr) self._num_envs = len(self._parent_prims) # Create warp env mask arrays for "all envs" cases and resets. # Note: We use wp.to_torch() to create zero-copy torch tensor views of warp arrays. # This allows warp arrays to be passed to warp kernels while the corresponding torch # views support fancy indexing (e.g. tensor[env_ids] = True) without any memory copies. # Both the warp array and torch view share the same underlying device memory. self._ALL_ENV_MASK = wp.ones((self._num_envs), dtype=wp.bool, device=self._device) self._reset_mask = wp.zeros((self._num_envs), dtype=wp.bool, device=self._device) self._reset_mask_torch = wp.to_torch(self._reset_mask) # timestamp and outdated flags self._is_outdated = wp.ones(self._num_envs, dtype=wp.bool, device=self._device) self._timestamp = wp.zeros(self._num_envs, dtype=wp.float32, device=self._device) self._timestamp_last_update = wp.zeros_like(self._timestamp) # Initialize debug visualization handle if self._debug_vis_handle is None: # set initial state of debug visualization self.set_debug_vis(self.cfg.debug_vis) @abstractmethod def _update_buffers_impl(self, env_mask: wp.array): """Fills the sensor data for provided environment ids. This function does not perform any time-based checks and directly fills the data into the data container. Args: env_mask: The mask of the environments that are ready to capture. """ raise NotImplementedError def _set_debug_vis_impl(self, debug_vis: bool): """Set debug visualization into visualization objects. This function is responsible for creating the visualization objects if they don't exist and input ``debug_vis`` is True. If the visualization objects exist, the function should set their visibility into the stage. """ raise NotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.") def _debug_vis_callback(self, event): """Callback for debug visualization. This function calls the visualization objects and sets the data to visualize into them. """ raise NotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.") """ Internal simulation callbacks. """ def _register_callbacks(self): """Registers physics lifecycle callbacks via the current backend's physics manager.""" physics_mgr_cls = sim_utils.SimulationContext.instance().physics_manager obj_ref = weakref.proxy(self) def _invoke(callback_name, event): getattr(obj_ref, callback_name)(event) # Backend-agnostic: PHYSICS_READY (init) and STOP (invalidate) self._initialize_handle = physics_mgr_cls.register_callback( lambda payload: PhysicsManager.safe_callback_invoke( _invoke, "_initialize_callback", payload, physics_manager=physics_mgr_cls ), PhysicsEvent.PHYSICS_READY, order=10, ) self._invalidate_initialize_handle = physics_mgr_cls.register_callback( lambda payload: PhysicsManager.safe_callback_invoke( _invoke, "_invalidate_initialize_callback", payload, physics_manager=physics_mgr_cls ), PhysicsEvent.STOP, order=10, ) # Optional: prim deletion (only supported by PhysX backend) self._prim_deletion_handle = None if "physx" in physics_mgr_cls.__name__.lower(): from isaaclab_physx.physics import IsaacEvents # noqa: PLC0415 self._prim_deletion_handle = physics_mgr_cls.register_callback( lambda event: PhysicsManager.safe_callback_invoke( _invoke, "_on_prim_deletion", event, physics_manager=physics_mgr_cls ), IsaacEvents.PRIM_DELETION, ) def _initialize_callback(self, event): """Initializes the scene elements. .. note:: Physics handles are only valid once the simulation is ready. This callback runs when :attr:`PhysicsEvent.PHYSICS_READY` is dispatched by the current backend. """ if not self._is_initialized: self._initialize_impl() self._is_initialized = True def _invalidate_initialize_callback(self, event): """Invalidates the scene elements.""" self._is_initialized = False if self._debug_vis_handle is not None: self._debug_vis_handle.unsubscribe() self._debug_vis_handle = None def _on_prim_deletion(self, event) -> None: """Invalidates and deletes the callbacks when the prim is deleted. Args: event: The prim deletion event containing the prim path in payload. Note: This function is called when the prim is deleted. """ prim_path = event.payload["prim_path"] if prim_path == "/": self._clear_callbacks() return result = re.match( pattern="^" + "/".join(self.cfg.prim_path.split("/")[: prim_path.count("/") + 1]) + "$", string=prim_path ) if result: self._clear_callbacks() def _clear_callbacks(self) -> None: """Clears the callbacks.""" if self._initialize_handle is not None: self._initialize_handle.deregister() self._initialize_handle = None if self._invalidate_initialize_handle is not None: self._invalidate_initialize_handle.deregister() self._invalidate_initialize_handle = None if self._prim_deletion_handle is not None: self._prim_deletion_handle.deregister() self._prim_deletion_handle = None # Clear debug visualization if self._debug_vis_handle is not None: self._debug_vis_handle.unsubscribe() self._debug_vis_handle = None """ Helper functions. """ def _update_outdated_buffers(self): """Fills the sensor data for the outdated sensors.""" self._update_buffers_impl(self._is_outdated) # update timestamps and clear outdated flags wp.launch( update_outdated_envs_kernel, dim=self._num_envs, inputs=[self._is_outdated, self._timestamp, self._timestamp_last_update], device=self._device, ) def _resolve_indices_and_mask( self, env_ids: Sequence[int] | None = None, env_mask: wp.array | None = None ) -> wp.array: """Resolve environment indices to a warp array and mask.""" if env_ids is None and env_mask is None: return self._ALL_ENV_MASK elif env_mask is not None: return env_mask else: self._reset_mask.zero_() self._reset_mask_torch[env_ids] = True return self._reset_mask