Source code for omni.isaac.lab.sensors.sensor_base
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Base class for sensors.
This class defines an interface for sensors similar to how the :class:`omni.isaac.lab.assets.AssetBase` class works.
Each sensor class should inherit from this class and implement the abstract methods.
"""
from __future__ import annotations
import inspect
import torch
import weakref
from abc import ABC, abstractmethod
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any
import omni.kit.app
import omni.timeline
import omni.isaac.lab.sim as sim_utils
if TYPE_CHECKING:
from .sensor_base_cfg import SensorBaseCfg
[docs]class SensorBase(ABC):
"""The base class for implementing a sensor.
The implementation is based on lazy evaluation. The sensor data is only updated when the user
tries accessing the data through the :attr:`data` property or sets ``force_compute=True`` in
the :meth:`update` method. This is done to avoid unnecessary computation when the sensor data
is not used.
The sensor is updated at the specified update period. If the update period is zero, then the
sensor is updated at every simulation step.
"""
[docs] def __init__(self, cfg: SensorBaseCfg):
"""Initialize the sensor class.
Args:
cfg: The configuration parameters for the sensor.
"""
# check that config is valid
if cfg.history_length < 0:
raise ValueError(f"History length must be greater than 0! Received: {cfg.history_length}")
# check that the config is valid
cfg.validate()
# store inputs
self.cfg = cfg
# flag for whether the sensor is initialized
self._is_initialized = False
# flag for whether the sensor is in visualization mode
self._is_visualizing = False
# note: Use weakref on callbacks to ensure that this object can be deleted when its destructor is called.
# add callbacks for stage play/stop
# The order is set to 10 which is arbitrary but should be lower priority than the default order of 0
timeline_event_stream = omni.timeline.get_timeline_interface().get_timeline_event_stream()
self._initialize_handle = timeline_event_stream.create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.PLAY),
lambda event, obj=weakref.proxy(self): obj._initialize_callback(event),
order=10,
)
self._invalidate_initialize_handle = timeline_event_stream.create_subscription_to_pop_by_type(
int(omni.timeline.TimelineEventType.STOP),
lambda event, obj=weakref.proxy(self): obj._invalidate_initialize_callback(event),
order=10,
)
# add handle for debug visualization (this is set to a valid handle inside set_debug_vis)
self._debug_vis_handle = None
# set initial state of debug visualization
self.set_debug_vis(self.cfg.debug_vis)
def __del__(self):
"""Unsubscribe from the callbacks."""
# clear physics events handles
if self._initialize_handle:
self._initialize_handle.unsubscribe()
self._initialize_handle = None
if self._invalidate_initialize_handle:
self._invalidate_initialize_handle.unsubscribe()
self._invalidate_initialize_handle = None
# clear debug visualization
if self._debug_vis_handle:
self._debug_vis_handle.unsubscribe()
self._debug_vis_handle = None
"""
Properties
"""
@property
def is_initialized(self) -> bool:
"""Whether the sensor is initialized.
Returns True if the sensor is initialized, False otherwise.
"""
return self._is_initialized
@property
def num_instances(self) -> int:
"""Number of instances of the sensor.
This is equal to the number of sensors per environment multiplied by the number of environments.
"""
return self._num_envs
@property
def device(self) -> str:
"""Memory device for computation."""
return self._device
@property
@abstractmethod
def data(self) -> Any:
"""Data from the sensor.
This property is only updated when the user tries to access the data. This is done to avoid
unnecessary computation when the sensor data is not used.
For updating the sensor when this property is accessed, you can use the following
code snippet in your sensor implementation:
.. code-block:: python
# update sensors if needed
self._update_outdated_buffers()
# return the data (where `_data` is the data for the sensor)
return self._data
"""
raise NotImplementedError
@property
def has_debug_vis_implementation(self) -> bool:
"""Whether the sensor has a debug visualization implemented."""
# check if function raises NotImplementedError
source_code = inspect.getsource(self._set_debug_vis_impl)
return "NotImplementedError" not in source_code
"""
Operations
"""
[docs] def set_debug_vis(self, debug_vis: bool) -> bool:
"""Sets whether to visualize the sensor data.
Args:
debug_vis: Whether to visualize the sensor data.
Returns:
Whether the debug visualization was successfully set. False if the sensor
does not support debug visualization.
"""
# check if debug visualization is supported
if not self.has_debug_vis_implementation:
return False
# toggle debug visualization objects
self._set_debug_vis_impl(debug_vis)
# toggle debug visualization flag
self._is_visualizing = debug_vis
# toggle debug visualization handles
if debug_vis:
# create a subscriber for the post update event if it doesn't exist
if self._debug_vis_handle is None:
app_interface = omni.kit.app.get_app_interface()
self._debug_vis_handle = app_interface.get_post_update_event_stream().create_subscription_to_pop(
lambda event, obj=weakref.proxy(self): obj._debug_vis_callback(event)
)
else:
# remove the subscriber if it exists
if self._debug_vis_handle is not None:
self._debug_vis_handle.unsubscribe()
self._debug_vis_handle = None
# return success
return True
[docs] def reset(self, env_ids: Sequence[int] | None = None):
"""Resets the sensor internals.
Args:
env_ids: The sensor ids to reset. Defaults to None.
"""
# Resolve sensor ids
if env_ids is None:
env_ids = slice(None)
# Reset the timestamp for the sensors
self._timestamp[env_ids] = 0.0
self._timestamp_last_update[env_ids] = 0.0
# Set all reset sensors to outdated so that they are updated when data is called the next time.
self._is_outdated[env_ids] = True
def update(self, dt: float, force_recompute: bool = False):
# Update the timestamp for the sensors
self._timestamp += dt
self._is_outdated |= self._timestamp - self._timestamp_last_update + 1e-6 >= self.cfg.update_period
# Update the buffers
# TODO (from @mayank): Why is there a history length here when it doesn't mean anything in the sensor base?!?
# It is only for the contact sensor but there we should redefine the update function IMO.
if force_recompute or self._is_visualizing or (self.cfg.history_length > 0):
self._update_outdated_buffers()
"""
Implementation specific.
"""
@abstractmethod
def _initialize_impl(self):
"""Initializes the sensor-related handles and internal buffers."""
# Obtain Simulation Context
sim = sim_utils.SimulationContext.instance()
if sim is None:
raise RuntimeError("Simulation Context is not initialized!")
# Obtain device and backend
self._device = sim.device
self._backend = sim.backend
self._sim_physics_dt = sim.get_physics_dt()
# Count number of environments
env_prim_path_expr = self.cfg.prim_path.rsplit("/", 1)[0]
self._parent_prims = sim_utils.find_matching_prims(env_prim_path_expr)
self._num_envs = len(self._parent_prims)
# Boolean tensor indicating whether the sensor data has to be refreshed
self._is_outdated = torch.ones(self._num_envs, dtype=torch.bool, device=self._device)
# Current timestamp (in seconds)
self._timestamp = torch.zeros(self._num_envs, device=self._device)
# Timestamp from last update
self._timestamp_last_update = torch.zeros_like(self._timestamp)
@abstractmethod
def _update_buffers_impl(self, env_ids: Sequence[int]):
"""Fills the sensor data for provided environment ids.
This function does not perform any time-based checks and directly fills the data into the
data container.
Args:
env_ids: The indices of the sensors that are ready to capture.
"""
raise NotImplementedError
def _set_debug_vis_impl(self, debug_vis: bool):
"""Set debug visualization into visualization objects.
This function is responsible for creating the visualization objects if they don't exist
and input ``debug_vis`` is True. If the visualization objects exist, the function should
set their visibility into the stage.
"""
raise NotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.")
def _debug_vis_callback(self, event):
"""Callback for debug visualization.
This function calls the visualization objects and sets the data to visualize into them.
"""
raise NotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.")
"""
Internal simulation callbacks.
"""
def _initialize_callback(self, event):
"""Initializes the scene elements.
Note:
PhysX handles are only enabled once the simulator starts playing. Hence, this function needs to be
called whenever the simulator "plays" from a "stop" state.
"""
if not self._is_initialized:
self._initialize_impl()
self._is_initialized = True
def _invalidate_initialize_callback(self, event):
"""Invalidates the scene elements."""
self._is_initialized = False
"""
Helper functions.
"""
def _update_outdated_buffers(self):
"""Fills the sensor data for the outdated sensors."""
outdated_env_ids = self._is_outdated.nonzero().squeeze(-1)
if len(outdated_env_ids) > 0:
# obtain new data
self._update_buffers_impl(outdated_env_ids)
# update the timestamp from last update
self._timestamp_last_update[outdated_env_ids] = self._timestamp[outdated_env_ids]
# set outdated flag to false for the updated sensors
self._is_outdated[outdated_env_ids] = False