# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import logging
from collections.abc import Sequence
from typing import TYPE_CHECKING
import torch
import isaaclab.utils.string as string_utils
from isaaclab.managers.action_manager import ActionTerm
from isaaclab_contrib.assets import Multirotor
if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedEnv
from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor
from . import thrust_actions_cfg
# import logger
logger = logging.getLogger(__name__)
[docs]class ThrustAction(ActionTerm):
"""Thrust action term that applies the processed actions as thrust commands. Actions are processed by applying an
affine transformation (scaling and offset) and clipping."""
cfg: thrust_actions_cfg.ThrustActionCfg
"""The configuration of the action term."""
_asset: Multirotor
"""The articulation asset on which the action term is applied."""
_scale: torch.Tensor | float
"""The scaling factor applied to the input action."""
_offset: torch.Tensor | float
"""The offset applied to the input action."""
_clip: torch.Tensor
"""The clip applied to the input action."""
[docs] def __init__(self, cfg: thrust_actions_cfg.ThrustActionCfg, env: ManagerBasedEnv) -> None:
# initialize the action term
super().__init__(cfg, env)
thruster_names_expr = self._asset.actuators["thrusters"].cfg.thruster_names_expr
# resolve the thrusters over which the action term is applied
self._thruster_ids, self._thruster_names = self._asset.find_bodies(
thruster_names_expr, preserve_order=self.cfg.preserve_order
)
self._num_thrusters = len(self._thruster_ids)
# log the resolved thruster names for debugging
logger.info(
f"Resolved thruster names for the action term {self.__class__.__name__}:"
f" {self._thruster_names} [{self._thruster_ids}]"
)
# Avoid indexing across all thrusters for efficiency
if self._num_thrusters == self._asset.num_thrusters and not self.cfg.preserve_order:
self._thruster_ids = slice(None)
# create tensors for raw and processed actions
self._raw_actions = torch.zeros(self.num_envs, self.action_dim, device=self.device)
self._processed_actions = torch.zeros_like(self.raw_actions)
# parse scale
if isinstance(cfg.scale, (float, int)):
self._scale = float(cfg.scale)
elif isinstance(cfg.scale, dict):
self._scale = torch.ones(self.num_envs, self.action_dim, device=self.device)
# resolve the dictionary config
index_list, _, value_list = string_utils.resolve_matching_names_values(self.cfg.scale, self._thruster_names)
self._scale[:, index_list] = torch.tensor(value_list, device=self.device)
else:
raise ValueError(f"Unsupported scale type: {type(cfg.scale)}. Supported types are float and dict.")
# parse offset
if isinstance(cfg.offset, (float, int)):
self._offset = float(cfg.offset)
elif isinstance(cfg.offset, dict):
self._offset = torch.zeros_like(self._raw_actions)
# resolve the dictionary config
index_list, _, value_list = string_utils.resolve_matching_names_values(
self.cfg.offset, self._thruster_names
)
self._offset[:, index_list] = torch.tensor(value_list, device=self.device)
else:
raise ValueError(f"Unsupported offset type: {type(cfg.offset)}. Supported types are float and dict.")
# parse clip
if cfg.clip is not None:
if isinstance(cfg.clip, dict):
self._clip = torch.tensor([[-float("inf"), float("inf")]], device=self.device).repeat(
self.num_envs, self.action_dim, 1
)
index_list, _, value_list = string_utils.resolve_matching_names_values(
self.cfg.clip, self._thruster_names
)
self._clip[:, index_list] = torch.tensor(value_list, device=self.device)
else:
raise ValueError(f"Unsupported clip type: {type(cfg.clip)}. Supported types are dict.")
# Handle use_default_offset
if cfg.use_default_offset:
# Use default thruster RPS as offset
self._offset = self._asset.data.default_thruster_rps[:, self._thruster_ids].clone()
@property
def action_dim(self) -> int:
return self._num_thrusters
@property
def raw_actions(self) -> torch.Tensor:
return self._raw_actions
@property
def processed_actions(self) -> torch.Tensor:
return self._processed_actions
@property
def IO_descriptor(self) -> GenericActionIODescriptor:
"""The IO descriptor of the action term."""
super().IO_descriptor
self._IO_descriptor.shape = (self.action_dim,)
self._IO_descriptor.dtype = str(self.raw_actions.dtype)
self._IO_descriptor.action_type = "ThrustAction"
self._IO_descriptor.thruster_names = self._thruster_names
self._IO_descriptor.scale = self._scale
if isinstance(self._offset, torch.Tensor):
self._IO_descriptor.offset = self._offset[0].detach().cpu().numpy().tolist()
else:
self._IO_descriptor.offset = self._offset
if self.cfg.clip is not None:
if isinstance(self._clip, torch.Tensor):
self._IO_descriptor.clip = self._clip[0].detach().cpu().numpy().tolist()
else:
self._IO_descriptor.clip = self._clip
else:
self._IO_descriptor.clip = None
return self._IO_descriptor
[docs] def process_actions(self, actions: torch.Tensor):
"""Process actions by applying scaling, offset, and clipping."""
# store the raw actions
self._raw_actions[:] = actions
# apply the affine transformations
self._processed_actions = self._raw_actions * self._scale + self._offset
# clip actions
if self.cfg.clip is not None:
self._processed_actions = torch.clamp(
self._processed_actions, min=self._clip[:, :, 0], max=self._clip[:, :, 1]
)
[docs] def apply_actions(self):
"""Apply the processed actions as thrust commands."""
# Set thrust targets using thruster IDs
self._asset.set_thrust_target(self.processed_actions, thruster_ids=self._thruster_ids)
[docs] def reset(self, env_ids: Sequence[int] | None = None) -> None:
"""Reset the action term."""
self._raw_actions[env_ids] = 0.0