# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Common functions that can be used to create observation terms.
The functions can be passed to the :class:`omni.isaac.lab.managers.ObservationTermCfg` object to enable
the observation introduced by the function.
"""
from __future__ import annotations
import torch
from typing import TYPE_CHECKING
import omni.isaac.lab.utils.math as math_utils
from omni.isaac.lab.assets import Articulation, RigidObject
from omni.isaac.lab.managers import SceneEntityCfg
from omni.isaac.lab.managers.manager_base import ManagerTermBase
from omni.isaac.lab.managers.manager_term_cfg import ObservationTermCfg
from omni.isaac.lab.sensors import Camera, Imu, RayCaster, RayCasterCamera, TiledCamera
if TYPE_CHECKING:
from omni.isaac.lab.envs import ManagerBasedEnv, ManagerBasedRLEnv
"""
Root state.
"""
[docs]def base_pos_z(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Root height in the simulation world frame."""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return asset.data.root_pos_w[:, 2].unsqueeze(-1)
[docs]def base_lin_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Root linear velocity in the asset's root frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.root_lin_vel_b
[docs]def base_ang_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Root angular velocity in the asset's root frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.root_ang_vel_b
[docs]def projected_gravity(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Gravity projection on the asset's root frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.projected_gravity_b
[docs]def root_pos_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Asset root position in the environment frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.root_pos_w - env.scene.env_origins
[docs]def root_quat_w(
env: ManagerBasedEnv, make_quat_unique: bool = False, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")
) -> torch.Tensor:
"""Asset root orientation (w, x, y, z) in the environment frame.
If :attr:`make_quat_unique` is True, then returned quaternion is made unique by ensuring
the quaternion has non-negative real component. This is because both ``q`` and ``-q`` represent
the same orientation.
"""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
quat = asset.data.root_quat_w
# make the quaternion real-part positive if configured
return math_utils.quat_unique(quat) if make_quat_unique else quat
[docs]def root_lin_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Asset root linear velocity in the environment frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.root_lin_vel_w
[docs]def root_ang_vel_w(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""Asset root angular velocity in the environment frame."""
# extract the used quantities (to enable type-hinting)
asset: RigidObject = env.scene[asset_cfg.name]
return asset.data.root_ang_vel_w
"""
Joint state.
"""
[docs]def joint_pos(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""The joint positions of the asset.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their positions returned.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return asset.data.joint_pos[:, asset_cfg.joint_ids]
[docs]def joint_pos_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")) -> torch.Tensor:
"""The joint positions of the asset w.r.t. the default joint positions.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their positions returned.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return asset.data.joint_pos[:, asset_cfg.joint_ids] - asset.data.default_joint_pos[:, asset_cfg.joint_ids]
[docs]def joint_pos_limit_normalized(
env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")
) -> torch.Tensor:
"""The joint positions of the asset normalized with the asset's joint limits.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their normalized positions returned.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return math_utils.scale_transform(
asset.data.joint_pos[:, asset_cfg.joint_ids],
asset.data.soft_joint_pos_limits[:, asset_cfg.joint_ids, 0],
asset.data.soft_joint_pos_limits[:, asset_cfg.joint_ids, 1],
)
[docs]def joint_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")):
"""The joint velocities of the asset.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their velocities returned.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return asset.data.joint_vel[:, asset_cfg.joint_ids]
[docs]def joint_vel_rel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("robot")):
"""The joint velocities of the asset w.r.t. the default joint velocities.
Note: Only the joints configured in :attr:`asset_cfg.joint_ids` will have their velocities returned.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
return asset.data.joint_vel[:, asset_cfg.joint_ids] - asset.data.default_joint_vel[:, asset_cfg.joint_ids]
"""
Sensors.
"""
[docs]def height_scan(env: ManagerBasedEnv, sensor_cfg: SceneEntityCfg, offset: float = 0.5) -> torch.Tensor:
"""Height scan from the given sensor w.r.t. the sensor's frame.
The provided offset (Defaults to 0.5) is subtracted from the returned values.
"""
# extract the used quantities (to enable type-hinting)
sensor: RayCaster = env.scene.sensors[sensor_cfg.name]
# height scan: height = sensor_height - hit_point_z - offset
return sensor.data.pos_w[:, 2].unsqueeze(1) - sensor.data.ray_hits_w[..., 2] - offset
[docs]def body_incoming_wrench(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg) -> torch.Tensor:
"""Incoming spatial wrench on bodies of an articulation in the simulation world frame.
This is the 6-D wrench (force and torque) applied to the body link by the incoming joint force.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# obtain the link incoming forces in world frame
link_incoming_forces = asset.root_physx_view.get_link_incoming_joint_force()[:, asset_cfg.body_ids]
return link_incoming_forces.view(env.num_envs, -1)
[docs]def imu_orientation(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("imu")) -> torch.Tensor:
"""Imu sensor orientation in the simulation world frame.
Args:
env: The environment.
asset_cfg: The SceneEntity associated with an IMU sensor. Defaults to SceneEntityCfg("imu").
Returns:
Orientation in the world frame in (w, x, y, z) quaternion form. Shape is (num_envs, 4).
"""
# extract the used quantities (to enable type-hinting)
asset: Imu = env.scene[asset_cfg.name]
# return the orientation quaternion
return asset.data.quat_w
[docs]def imu_ang_vel(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("imu")) -> torch.Tensor:
"""Imu sensor angular velocity w.r.t. environment origin expressed in the sensor frame.
Args:
env: The environment.
asset_cfg: The SceneEntity associated with an IMU sensor. Defaults to SceneEntityCfg("imu").
Returns:
The angular velocity (rad/s) in the sensor frame. Shape is (num_envs, 3).
"""
# extract the used quantities (to enable type-hinting)
asset: Imu = env.scene[asset_cfg.name]
# return the angular velocity
return asset.data.ang_vel_b
[docs]def imu_lin_acc(env: ManagerBasedEnv, asset_cfg: SceneEntityCfg = SceneEntityCfg("imu")) -> torch.Tensor:
"""Imu sensor linear acceleration w.r.t. the environment origin expressed in sensor frame.
Args:
env: The environment.
asset_cfg: The SceneEntity associated with an IMU sensor. Defaults to SceneEntityCfg("imu").
Returns:
The linear acceleration (m/s^2) in the sensor frame. Shape is (num_envs, 3).
"""
asset: Imu = env.scene[asset_cfg.name]
return asset.data.lin_acc_b
[docs]def image(
env: ManagerBasedEnv,
sensor_cfg: SceneEntityCfg = SceneEntityCfg("tiled_camera"),
data_type: str = "rgb",
convert_perspective_to_orthogonal: bool = False,
normalize: bool = True,
) -> torch.Tensor:
"""Images of a specific datatype from the camera sensor.
If the flag :attr:`normalize` is True, post-processing of the images are performed based on their
data-types:
- "rgb": Scales the image to (0, 1) and subtracts with the mean of the current image batch.
- "depth" or "distance_to_camera" or "distance_to_plane": Replaces infinity values with zero.
Args:
env: The environment the cameras are placed within.
sensor_cfg: The desired sensor to read from. Defaults to SceneEntityCfg("tiled_camera").
data_type: The data type to pull from the desired camera. Defaults to "rgb".
convert_perspective_to_orthogonal: Whether to orthogonalize perspective depth images.
This is used only when the data type is "distance_to_camera". Defaults to False.
normalize: Whether to normalize the images. This depends on the selected data type.
Defaults to True.
Returns:
The images produced at the last time-step
"""
# extract the used quantities (to enable type-hinting)
sensor: TiledCamera | Camera | RayCasterCamera = env.scene.sensors[sensor_cfg.name]
# obtain the input image
images = sensor.data.output[data_type]
# depth image conversion
if (data_type == "distance_to_camera") and convert_perspective_to_orthogonal:
images = math_utils.orthogonalize_perspective_depth(images, sensor.data.intrinsic_matrices)
# rgb/depth image normalization
if normalize:
if data_type == "rgb":
images = images.float() / 255.0
mean_tensor = torch.mean(images, dim=(1, 2), keepdim=True)
images -= mean_tensor
elif "distance_to" in data_type or "depth" in data_type:
images[images == float("inf")] = 0
return images.clone()
[docs]class image_features(ManagerTermBase):
"""Extracted image features from a pre-trained frozen encoder.
This term uses models from the model zoo in PyTorch and extracts features from the images.
It calls the :func:`image` function to get the images and then processes them using the model zoo.
A user can provide their own model zoo configuration to use different models for feature extraction.
The model zoo configuration should be a dictionary that maps different model names to a dictionary
that defines the model, preprocess and inference functions. The dictionary should have the following
entries:
- "model": A callable that returns the model when invoked without arguments.
- "reset": A callable that resets the model. This is useful when the model has a state that needs to be reset.
- "inference": A callable that, when given the model and the images, returns the extracted features.
If the model zoo configuration is not provided, the default model zoo configurations are used. The default
model zoo configurations include the models from Theia :cite:`shang2024theia` and ResNet :cite:`he2016deep`.
These models are loaded from `Hugging-Face transformers <https://huggingface.co/docs/transformers/index>`_ and
`PyTorch torchvision <https://pytorch.org/vision/stable/models.html>`_ respectively.
Args:
sensor_cfg: The sensor configuration to poll. Defaults to SceneEntityCfg("tiled_camera").
data_type: The sensor data type. Defaults to "rgb".
convert_perspective_to_orthogonal: Whether to orthogonalize perspective depth images.
This is used only when the data type is "distance_to_camera". Defaults to False.
model_zoo_cfg: A user-defined dictionary that maps different model names to their respective configurations.
Defaults to None. If None, the default model zoo configurations are used.
model_name: The name of the model to use for inference. Defaults to "resnet18".
model_device: The device to store and infer the model on. This is useful when offloading the computation
from the environment simulation device. Defaults to the environment device.
inference_kwargs: Additional keyword arguments to pass to the inference function. Defaults to None,
which means no additional arguments are passed.
Returns:
The extracted features tensor. Shape is (num_envs, feature_dim).
Raises:
ValueError: When the model name is not found in the provided model zoo configuration.
ValueError: When the model name is not found in the default model zoo configuration.
"""
[docs] def __init__(self, cfg: ObservationTermCfg, env: ManagerBasedEnv):
# initialize the base class
super().__init__(cfg, env)
# extract parameters from the configuration
self.model_zoo_cfg: dict = cfg.params.get("model_zoo_cfg") # type: ignore
self.model_name: str = cfg.params.get("model_name", "resnet18") # type: ignore
self.model_device: str = cfg.params.get("model_device", env.device) # type: ignore
# List of Theia models - These are configured through `_prepare_theia_transformer_model` function
default_theia_models = [
"theia-tiny-patch16-224-cddsv",
"theia-tiny-patch16-224-cdiv",
"theia-small-patch16-224-cdiv",
"theia-base-patch16-224-cdiv",
"theia-small-patch16-224-cddsv",
"theia-base-patch16-224-cddsv",
]
# List of ResNet models - These are configured through `_prepare_resnet_model` function
default_resnet_models = ["resnet18", "resnet34", "resnet50", "resnet101"]
# Check if model name is specified in the model zoo configuration
if self.model_zoo_cfg is not None and self.model_name not in self.model_zoo_cfg:
raise ValueError(
f"Model name '{self.model_name}' not found in the provided model zoo configuration."
" Please add the model to the model zoo configuration or use a different model name."
f" Available models in the provided list: {list(self.model_zoo_cfg.keys())}."
"\nHint: If you want to use a default model, consider using one of the following models:"
f" {default_theia_models + default_resnet_models}. In this case, you can remove the"
" 'model_zoo_cfg' parameter from the observation term configuration."
)
if self.model_zoo_cfg is None:
if self.model_name in default_theia_models:
model_config = self._prepare_theia_transformer_model(self.model_name, self.model_device)
elif self.model_name in default_resnet_models:
model_config = self._prepare_resnet_model(self.model_name, self.model_device)
else:
raise ValueError(
f"Model name '{self.model_name}' not found in the default model zoo configuration."
f" Available models: {default_theia_models + default_resnet_models}."
)
else:
model_config = self.model_zoo_cfg[self.model_name]
# Retrieve the model, preprocess and inference functions
self._model = model_config["model"]()
self._reset_fn = model_config.get("reset")
self._inference_fn = model_config["inference"]
[docs] def reset(self, env_ids: torch.Tensor | None = None):
# reset the model if a reset function is provided
# this might be useful when the model has a state that needs to be reset
# for example: video transformers
if self._reset_fn is not None:
self._reset_fn(self._model, env_ids)
def __call__(
self,
env: ManagerBasedEnv,
sensor_cfg: SceneEntityCfg = SceneEntityCfg("tiled_camera"),
data_type: str = "rgb",
convert_perspective_to_orthogonal: bool = False,
model_zoo_cfg: dict | None = None,
model_name: str = "resnet18",
model_device: str | None = None,
inference_kwargs: dict | None = None,
) -> torch.Tensor:
# obtain the images from the sensor
image_data = image(
env=env,
sensor_cfg=sensor_cfg,
data_type=data_type,
convert_perspective_to_orthogonal=convert_perspective_to_orthogonal,
normalize=False, # we pre-process based on model
)
# store the device of the image
image_device = image_data.device
# forward the images through the model
features = self._inference_fn(self._model, image_data, **(inference_kwargs or {}))
# move the features back to the image device
return features.detach().to(image_device)
"""
Helper functions.
"""
def _prepare_theia_transformer_model(self, model_name: str, model_device: str) -> dict:
"""Prepare the Theia transformer model for inference.
Args:
model_name: The name of the Theia transformer model to prepare.
model_device: The device to store and infer the model on.
Returns:
A dictionary containing the model and inference functions.
"""
from transformers import AutoModel
def _load_model() -> torch.nn.Module:
"""Load the Theia transformer model."""
model = AutoModel.from_pretrained(f"theaiinstitute/{model_name}", trust_remote_code=True).eval()
return model.to(model_device)
def _inference(model, images: torch.Tensor) -> torch.Tensor:
"""Inference the Theia transformer model.
Args:
model: The Theia transformer model.
images: The preprocessed image tensor. Shape is (num_envs, height, width, channel).
Returns:
The extracted features tensor. Shape is (num_envs, feature_dim).
"""
# Move the image to the model device
image_proc = images.to(model_device)
# permute the image to (num_envs, channel, height, width)
image_proc = image_proc.permute(0, 3, 1, 2).float() / 255.0
# Normalize the image
mean = torch.tensor([0.485, 0.456, 0.406], device=model_device).view(1, 3, 1, 1)
std = torch.tensor([0.229, 0.224, 0.225], device=model_device).view(1, 3, 1, 1)
image_proc = (image_proc - mean) / std
# Taken from Transformers; inference converted to be GPU only
features = model.backbone.model(pixel_values=image_proc, interpolate_pos_encoding=True)
return features.last_hidden_state[:, 1:]
# return the model, preprocess and inference functions
return {"model": _load_model, "inference": _inference}
def _prepare_resnet_model(self, model_name: str, model_device: str) -> dict:
"""Prepare the ResNet model for inference.
Args:
model_name: The name of the ResNet model to prepare.
model_device: The device to store and infer the model on.
Returns:
A dictionary containing the model and inference functions.
"""
from torchvision import models
def _load_model() -> torch.nn.Module:
"""Load the ResNet model."""
# map the model name to the weights
resnet_weights = {
"resnet18": "ResNet18_Weights.IMAGENET1K_V1",
"resnet34": "ResNet34_Weights.IMAGENET1K_V1",
"resnet50": "ResNet50_Weights.IMAGENET1K_V1",
"resnet101": "ResNet101_Weights.IMAGENET1K_V1",
}
# load the model
model = getattr(models, model_name)(weights=resnet_weights[model_name]).eval()
return model.to(model_device)
def _inference(model, images: torch.Tensor) -> torch.Tensor:
"""Inference the ResNet model.
Args:
model: The ResNet model.
images: The preprocessed image tensor. Shape is (num_envs, channel, height, width).
Returns:
The extracted features tensor. Shape is (num_envs, feature_dim).
"""
# move the image to the model device
image_proc = images.to(model_device)
# permute the image to (num_envs, channel, height, width)
image_proc = image_proc.permute(0, 3, 1, 2).float() / 255.0
# normalize the image
mean = torch.tensor([0.485, 0.456, 0.406], device=model_device).view(1, 3, 1, 1)
std = torch.tensor([0.229, 0.224, 0.225], device=model_device).view(1, 3, 1, 1)
image_proc = (image_proc - mean) / std
# forward the image through the model
return model(image_proc)
# return the model, preprocess and inference functions
return {"model": _load_model, "inference": _inference}
"""
Actions.
"""
[docs]def last_action(env: ManagerBasedEnv, action_name: str | None = None) -> torch.Tensor:
"""The last input action to the environment.
The name of the action term for which the action is required. If None, the
entire action tensor is returned.
"""
if action_name is None:
return env.action_manager.action
else:
return env.action_manager.get_term(action_name).raw_actions
"""
Commands.
"""
[docs]def generated_commands(env: ManagerBasedRLEnv, command_name: str) -> torch.Tensor:
"""The generated command from command term in the command manager with the given name."""
return env.command_manager.get_command(command_name)