Source code for isaaclab_contrib.mdp.actions.thrust_actions

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import logging
from collections.abc import Sequence
from typing import TYPE_CHECKING

import torch

import isaaclab.utils.string as string_utils
from isaaclab.managers.action_manager import ActionTerm

from isaaclab_contrib.assets import Multirotor

if TYPE_CHECKING:
    from isaaclab.envs import ManagerBasedEnv
    from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor

    from . import thrust_actions_cfg

# import logger
logger = logging.getLogger(__name__)


[docs]class ThrustAction(ActionTerm): """Thrust action term that applies the processed actions as thrust commands. Actions are processed by applying an affine transformation (scaling and offset) and clipping.""" cfg: thrust_actions_cfg.ThrustActionCfg """The configuration of the action term.""" _asset: Multirotor """The articulation asset on which the action term is applied.""" _scale: torch.Tensor | float """The scaling factor applied to the input action.""" _offset: torch.Tensor | float """The offset applied to the input action.""" _clip: torch.Tensor """The clip applied to the input action."""
[docs] def __init__(self, cfg: thrust_actions_cfg.ThrustActionCfg, env: ManagerBasedEnv) -> None: # initialize the action term super().__init__(cfg, env) thruster_names_expr = self._asset.actuators["thrusters"].cfg.thruster_names_expr # resolve the thrusters over which the action term is applied self._thruster_ids, self._thruster_names = self._asset.find_bodies( thruster_names_expr, preserve_order=self.cfg.preserve_order ) self._num_thrusters = len(self._thruster_ids) # log the resolved thruster names for debugging logger.info( f"Resolved thruster names for the action term {self.__class__.__name__}:" f" {self._thruster_names} [{self._thruster_ids}]" ) # Avoid indexing across all thrusters for efficiency if self._num_thrusters == self._asset.num_thrusters and not self.cfg.preserve_order: self._thruster_ids = slice(None) # create tensors for raw and processed actions self._raw_actions = torch.zeros(self.num_envs, self.action_dim, device=self.device) self._processed_actions = torch.zeros_like(self.raw_actions) # parse scale if isinstance(cfg.scale, (float, int)): self._scale = float(cfg.scale) elif isinstance(cfg.scale, dict): self._scale = torch.ones(self.num_envs, self.action_dim, device=self.device) # resolve the dictionary config index_list, _, value_list = string_utils.resolve_matching_names_values(self.cfg.scale, self._thruster_names) self._scale[:, index_list] = torch.tensor(value_list, device=self.device) else: raise ValueError(f"Unsupported scale type: {type(cfg.scale)}. Supported types are float and dict.") # parse offset if isinstance(cfg.offset, (float, int)): self._offset = float(cfg.offset) elif isinstance(cfg.offset, dict): self._offset = torch.zeros_like(self._raw_actions) # resolve the dictionary config index_list, _, value_list = string_utils.resolve_matching_names_values( self.cfg.offset, self._thruster_names ) self._offset[:, index_list] = torch.tensor(value_list, device=self.device) else: raise ValueError(f"Unsupported offset type: {type(cfg.offset)}. Supported types are float and dict.") # parse clip if cfg.clip is not None: if isinstance(cfg.clip, dict): self._clip = torch.tensor([[-float("inf"), float("inf")]], device=self.device).repeat( self.num_envs, self.action_dim, 1 ) index_list, _, value_list = string_utils.resolve_matching_names_values( self.cfg.clip, self._thruster_names ) self._clip[:, index_list] = torch.tensor(value_list, device=self.device) else: raise ValueError(f"Unsupported clip type: {type(cfg.clip)}. Supported types are dict.") # Handle use_default_offset if cfg.use_default_offset: # Use default thruster RPS as offset self._offset = self._asset.data.default_thruster_rps[:, self._thruster_ids].clone()
@property def action_dim(self) -> int: return self._num_thrusters @property def raw_actions(self) -> torch.Tensor: return self._raw_actions @property def processed_actions(self) -> torch.Tensor: return self._processed_actions @property def IO_descriptor(self) -> GenericActionIODescriptor: """The IO descriptor of the action term.""" super().IO_descriptor self._IO_descriptor.shape = (self.action_dim,) self._IO_descriptor.dtype = str(self.raw_actions.dtype) self._IO_descriptor.action_type = "ThrustAction" self._IO_descriptor.thruster_names = self._thruster_names self._IO_descriptor.scale = self._scale if isinstance(self._offset, torch.Tensor): self._IO_descriptor.offset = self._offset[0].detach().cpu().numpy().tolist() else: self._IO_descriptor.offset = self._offset if self.cfg.clip is not None: if isinstance(self._clip, torch.Tensor): self._IO_descriptor.clip = self._clip[0].detach().cpu().numpy().tolist() else: self._IO_descriptor.clip = self._clip else: self._IO_descriptor.clip = None return self._IO_descriptor
[docs] def process_actions(self, actions: torch.Tensor): """Process actions by applying scaling, offset, and clipping.""" # store the raw actions self._raw_actions[:] = actions # apply the affine transformations self._processed_actions = self._raw_actions * self._scale + self._offset # clip actions if self.cfg.clip is not None: self._processed_actions = torch.clamp( self._processed_actions, min=self._clip[:, :, 0], max=self._clip[:, :, 1] )
[docs] def apply_actions(self): """Apply the processed actions as thrust commands.""" # Set thrust targets using thruster IDs self._asset.set_thrust_target(self.processed_actions, thruster_ids=self._thruster_ids)
[docs] def reset(self, env_ids: Sequence[int] | None = None) -> None: """Reset the action term.""" self._raw_actions[env_ids] = 0.0