Source code for isaaclab_contrib.mdp.actions.thrust_actions

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import logging
from collections.abc import Sequence
from typing import TYPE_CHECKING

import torch

import isaaclab.utils.string as string_utils
from isaaclab.managers.action_manager import ActionTerm

if TYPE_CHECKING:
    from isaaclab.envs import ManagerBasedEnv
    from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor

    from isaaclab_contrib.assets import Multirotor

    from . import thrust_actions_cfg

# import logger
logger = logging.getLogger(__name__)


[docs] class ThrustAction(ActionTerm): """Thrust action term that applies the processed actions as thrust commands. This action term is designed specifically for controlling multirotor vehicles by mapping action inputs to thruster commands. It provides flexible preprocessing of actions through: - **Scaling**: Multiply actions by a scale factor to adjust command magnitudes - **Offset**: Add an offset to center actions around a baseline (e.g., hover thrust) - **Clipping**: Constrain actions to valid ranges to prevent unsafe commands The action term integrates with Isaac Lab's :class:`~isaaclab.managers.ActionManager` framework and is specifically designed to work with :class:`~isaaclab_contrib.assets.Multirotor` assets. Key Features: - Supports per-thruster or uniform scaling and offsets - Optional automatic offset computation based on hover thrust - Action clipping for safety and constraint enforcement - Regex-based thruster selection for flexible control schemes Example: .. code-block:: python from isaaclab.envs import ManagerBasedRLEnvCfg from isaaclab_contrib.mdp.actions import ThrustActionCfg @configclass class MyEnvCfg(ManagerBasedRLEnvCfg): # ... other configuration ... @configclass class ActionsCfg: # Direct thrust control (normalized actions) thrust = ThrustActionCfg( asset_name="robot", scale=5.0, # Convert [-1, 1] to [-5, 5] N use_default_offset=True, # Add hover thrust as offset clip={".*": (-2.0, 8.0)}, # Clip to safe thrust range ) """ cfg: thrust_actions_cfg.ThrustActionCfg """The configuration of the action term.""" _asset: Multirotor """The articulation asset on which the action term is applied.""" _scale: torch.Tensor | float """The scaling factor applied to the input action.""" _offset: torch.Tensor | float """The offset applied to the input action.""" _clip: torch.Tensor """The clip applied to the input action."""
[docs] def __init__(self, cfg: thrust_actions_cfg.ThrustActionCfg, env: ManagerBasedEnv) -> None: # initialize the action term super().__init__(cfg, env) thruster_names_expr = self._asset.actuators["thrusters"].cfg.thruster_names_expr # resolve the thrusters over which the action term is applied self._thruster_ids, self._thruster_names = self._asset.find_bodies( thruster_names_expr, preserve_order=self.cfg.preserve_order ) self._num_thrusters = len(self._thruster_ids) # log the resolved thruster names for debugging logger.info( f"Resolved thruster names for the action term {self.__class__.__name__}:" f" {self._thruster_names} [{self._thruster_ids}]" ) # Avoid indexing across all thrusters for efficiency if self._num_thrusters == self._asset.num_thrusters and not self.cfg.preserve_order: self._thruster_ids = slice(None) # create tensors for raw and processed actions self._raw_actions = torch.zeros(self.num_envs, self.action_dim, device=self.device) self._processed_actions = torch.zeros_like(self.raw_actions) # parse scale if isinstance(cfg.scale, (float, int)): self._scale = float(cfg.scale) elif isinstance(cfg.scale, dict): self._scale = torch.ones(self.num_envs, self.action_dim, device=self.device) # resolve the dictionary config index_list, _, value_list = string_utils.resolve_matching_names_values(self.cfg.scale, self._thruster_names) self._scale[:, index_list] = torch.tensor(value_list, device=self.device) else: raise ValueError(f"Unsupported scale type: {type(cfg.scale)}. Supported types are float and dict.") # parse offset if isinstance(cfg.offset, (float, int)): self._offset = float(cfg.offset) elif isinstance(cfg.offset, dict): self._offset = torch.zeros_like(self._raw_actions) # resolve the dictionary config index_list, _, value_list = string_utils.resolve_matching_names_values( self.cfg.offset, self._thruster_names ) self._offset[:, index_list] = torch.tensor(value_list, device=self.device) else: raise ValueError(f"Unsupported offset type: {type(cfg.offset)}. Supported types are float and dict.") # parse clip if cfg.clip is not None: if isinstance(cfg.clip, dict): self._clip = torch.tensor([[-float("inf"), float("inf")]], device=self.device).repeat( self.num_envs, self.action_dim, 1 ) index_list, _, value_list = string_utils.resolve_matching_names_values( self.cfg.clip, self._thruster_names ) self._clip[:, index_list] = torch.tensor(value_list, device=self.device) else: raise ValueError(f"Unsupported clip type: {type(cfg.clip)}. Supported types are dict.") # Handle use_default_offset if cfg.use_default_offset: # Use default thruster RPS as offset self._offset = self._asset.data.default_thruster_rps[:, self._thruster_ids].clone()
""" Properties """ @property def action_dim(self) -> int: return self._num_thrusters @property def raw_actions(self) -> torch.Tensor: return self._raw_actions @property def processed_actions(self) -> torch.Tensor: return self._processed_actions @property def IO_descriptor(self) -> GenericActionIODescriptor: """The IO descriptor of the action term.""" super().IO_descriptor self._IO_descriptor.shape = (self.action_dim,) self._IO_descriptor.dtype = str(self.raw_actions.dtype) self._IO_descriptor.action_type = "ThrustAction" self._IO_descriptor.thruster_names = self._thruster_names self._IO_descriptor.scale = self._scale if isinstance(self._offset, torch.Tensor): self._IO_descriptor.offset = self._offset[0].detach().cpu().numpy().tolist() else: self._IO_descriptor.offset = self._offset if self.cfg.clip is not None: if isinstance(self._clip, torch.Tensor): self._IO_descriptor.clip = self._clip[0].detach().cpu().numpy().tolist() else: self._IO_descriptor.clip = self._clip else: self._IO_descriptor.clip = None return self._IO_descriptor """ Methods """
[docs] def reset(self, env_ids: Sequence[int] | None = None) -> None: """Reset the action term. This method resets the raw actions to zero for the specified environments. The processed actions will be recomputed during the next :meth:`process_actions` call. Args: env_ids: Environment indices to reset. Defaults to None (all environments). """ self._raw_actions[env_ids] = 0.0
[docs] def process_actions(self, actions: torch.Tensor): r"""Process actions by applying scaling, offset, and clipping. This method transforms raw policy actions into thrust commands through an affine transformation followed by optional clipping. The transformation is: .. math:: \text{processed} = \text{raw} \times \text{scale} + \text{offset} If clipping is configured, the processed actions are then clamped: .. math:: \text{processed} = \text{clamp}(\text{processed}, \text{min}, \text{max}) Args: actions: Raw action tensor from the policy. Shape is ``(num_envs, action_dim)``. Typically in the range [-1, 1] for normalized policies. Note: The processed actions are stored internally and applied during the next :meth:`apply_actions` call. """ # store the raw actions self._raw_actions[:] = actions # apply the affine transformations self._processed_actions = self._raw_actions * self._scale + self._offset # clip actions if self.cfg.clip is not None: self._processed_actions = torch.clamp( self._processed_actions, min=self._clip[:, :, 0], max=self._clip[:, :, 1] )
[docs] def apply_actions(self): """Apply the processed actions as thrust commands. This method sets the processed actions as thrust targets on the multirotor asset. The thrust targets are then used by the thruster actuator models to compute actual thrust forces during the simulation step. The method calls :meth:`~isaaclab_contrib.assets.Multirotor.set_thrust_target` on the multirotor asset with the appropriate thruster IDs. """ # Set thrust targets using thruster IDs self._asset.set_thrust_target(self.processed_actions, thruster_ids=self._thruster_ids)
class NavigationAction(ThrustAction): """Navigation action term that converts high-level navigation commands to thrust commands using a geometric tracking controller. This action term extends `ThrustAction` by adding a controller layer that computes wrench (force and torque) commands from navigation setpoints, then allocates those wrenches to individual thruster commands using the multirotor's allocation matrix. The controller type is automatically determined based on the `controller_cfg` type: - LeeVelControllerCfg: Velocity tracking controller - LeePosControllerCfg: Position tracking controller - LeeAccControllerCfg: Acceleration tracking controller The control pipeline: 1. Process raw actions (scale, offset, clip) using parent `ThrustAction` 2. Transform processed actions into setpoints constrained within camera FOV 3. Compute 6-DOF wrench command using the selected Lee controller 4. Solve thrust allocation: thrust_cmd = pinv(allocation_matrix) @ wrench_cmd 5. Apply thrust commands to thrusters Attributes: cfg: Configuration for the navigation action term, including controller config. _lc: Lee controller instance (LeeVelController, LeePosController, or LeeAccController). Action Space: The action dimension is always 3D: (forward_magnitude, pitch_angle, yaw_rate) Actions are clipped in range [-1, 1] and are transformed to controller commands: - Forward position/velocity/acceleration: [0, max_magnitude] via (action[0] + 1) * cos(pitch) * max_magnitude / 2 - Lateral position/velocity/acceleration: Always 0.0 (constrained to camera FOV) - Vertical position/velocity/acceleration: [0, max_magnitude] via (action[0] + 1) * sin(pitch) * max_magnitude / 2 - Yaw command: [-max_yaw_command, max_yaw_command] via action[2] * max_yaw_command (yaw command is yawrate [rad/s] for velocity and acceleration control and relative yaw change [rad] for position control) Where: - pitch angle is computed as: action[1] * max_inclination_angle Parameters (from cfg): max_magnitude: Maximum translational magnitude for position/velocity/acceleration commands. max_yaw_command: Maximum yaw command in rad/s for velocity and acceleration control and relative yaw change [rad] for position control. max_inclination_angle: Maximum pitch angle in rad. Notes: - The controller's internal states (e.g., integral terms) are reset when `reset()` is called. - Lateral term is constrained to 0.0 to keep commands within camera FOV. - The x and z components are derived from magnitude and inclination angle. - Requires the multirotor asset to have a valid `allocation_matrix` attribute. Example: ```python cfg = NavigationActionCfg( controller_cfg=LeeVelControllerCfg(...), asset_name="robot", max_magnitude=2.0, max_yaw_command=1.047, max_inclination_angle=0.785, # pi/4 ) nav_action = NavigationAction(cfg, env) ``` """ cfg: thrust_actions_cfg.NavigationActionCfg """The configuration of the action term.""" def __init__(self, cfg: thrust_actions_cfg.NavigationActionCfg, env: ManagerBasedEnv) -> None: # Initialize parent class (this handles all the thruster setup) super().__init__(cfg, env) # Initialize controller using class_type from config self._lc = self.cfg.controller_cfg.class_type( cfg=self.cfg.controller_cfg, asset=self._asset, num_envs=self.num_envs, device=self.device ) # Log warning if not using velocity controller from isaaclab_contrib.controllers import LeeVelControllerCfg if not isinstance(self.cfg.controller_cfg, LeeVelControllerCfg): logger.warning( "Navigation task tuned for velocity control. " "Consider using velocity controller for better performance or retune reward function." ) # Cache allocation matrix and its pseudo-inverse (static for this asset/config) self._allocation_matrix = self._asset.allocation_matrix self._allocation_pinv = torch.linalg.pinv(self._allocation_matrix) # Add buffer to store velocity commands for observations) self._commands = torch.zeros(self.num_envs, 4, device=self.device) self._prev_commands = torch.zeros(self.num_envs, 4, device=self.device) @property def action_dim(self) -> int: return 3 @property def prev_commands(self) -> torch.Tensor: return self._prev_commands @property def IO_descriptor(self) -> GenericActionIODescriptor: """The IO descriptor of the action term.""" # Get parent IO descriptor descriptor = super().IO_descriptor # Override action type for navigation descriptor.action_type = "NavigationAction" return descriptor def process_actions(self, actions: torch.Tensor): """Process actions by applying scaling, offset, and clipping.""" # Call parent to handle basic processing super().process_actions(actions) self._has_actions_updated = False def apply_actions(self): """Apply the processed actions as velocity commands.""" # process the actions to be in the correct range clamped_action = torch.clamp(self.processed_actions, min=-1.0, max=1.0) processed_actions = torch.zeros(self.num_envs, 4, device=self.device) clamped_action[:, 0] += 1.0 # only allow positive thrust commands [0, 2] processed_actions[:, 0] = ( clamped_action[:, 0] * torch.cos(self.cfg.max_inclination_angle * clamped_action[:, 1]) * self.cfg.max_magnitude / 2.0 ) processed_actions[:, 1] = 0.0 # set lateral thrust command to 0 processed_actions[:, 2] = ( clamped_action[:, 0] * torch.sin(self.cfg.max_inclination_angle * clamped_action[:, 1]) * self.cfg.max_magnitude / 2.0 ) processed_actions[:, 3] = clamped_action[:, 2] * self.cfg.max_yaw_command # Store velocity commands for observations if not self._has_actions_updated: self._prev_commands[:] = self._commands self._commands[:] = processed_actions self._has_actions_updated = True # Compute wrench command using controller wrench_command = self._lc.compute(processed_actions) # Convert wrench to thrust commands using allocation matrix thrust_commands = wrench_command @ self._allocation_pinv.T # Apply thrust commands using thruster IDs self._asset.set_thrust_target(thrust_commands, thruster_ids=self._thruster_ids) def reset(self, env_ids: Sequence[int] | None = None) -> None: # Call parent reset super().reset(env_ids) # Reset controller internal states self._lc.reset_idx(env_ids) if env_ids is None: env_ids = slice(None) self._commands[env_ids] = 0.0 self._prev_commands[env_ids] = 0.0