Source code for omni.isaac.lab.managers.action_manager

# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Action manager for processing actions sent to the environment."""

from __future__ import annotations

import inspect
import torch
import weakref
from abc import abstractmethod
from collections.abc import Sequence
from prettytable import PrettyTable
from typing import TYPE_CHECKING

import omni.kit.app

from omni.isaac.lab.assets import AssetBase

from .manager_base import ManagerBase, ManagerTermBase
from .manager_term_cfg import ActionTermCfg

if TYPE_CHECKING:
    from omni.isaac.lab.envs import ManagerBasedEnv


[docs]class ActionTerm(ManagerTermBase): """Base class for action terms. The action term is responsible for processing the raw actions sent to the environment and applying them to the asset managed by the term. The action term is comprised of two operations: * Processing of actions: This operation is performed once per **environment step** and is responsible for pre-processing the raw actions sent to the environment. * Applying actions: This operation is performed once per **simulation step** and is responsible for applying the processed actions to the asset managed by the term. """
[docs] def __init__(self, cfg: ActionTermCfg, env: ManagerBasedEnv): """Initialize the action term. Args: cfg: The configuration object. env: The environment instance. """ # call the base class constructor super().__init__(cfg, env) # parse config to obtain asset to which the term is applied self._asset: AssetBase = self._env.scene[self.cfg.asset_name] # add handle for debug visualization (this is set to a valid handle inside set_debug_vis) self._debug_vis_handle = None # set initial state of debug visualization self.set_debug_vis(self.cfg.debug_vis)
def __del__(self): """Unsubscribe from the callbacks.""" if self._debug_vis_handle: self._debug_vis_handle.unsubscribe() self._debug_vis_handle = None """ Properties. """ @property @abstractmethod def action_dim(self) -> int: """Dimension of the action term.""" raise NotImplementedError @property @abstractmethod def raw_actions(self) -> torch.Tensor: """The input/raw actions sent to the term.""" raise NotImplementedError @property @abstractmethod def processed_actions(self) -> torch.Tensor: """The actions computed by the term after applying any processing.""" raise NotImplementedError @property def has_debug_vis_implementation(self) -> bool: """Whether the action term has a debug visualization implemented.""" # check if function raises NotImplementedError source_code = inspect.getsource(self._set_debug_vis_impl) return "NotImplementedError" not in source_code """ Operations. """
[docs] def set_debug_vis(self, debug_vis: bool) -> bool: """Sets whether to visualize the action term data. Args: debug_vis: Whether to visualize the action term data. Returns: Whether the debug visualization was successfully set. False if the action term does not support debug visualization. """ # check if debug visualization is supported if not self.has_debug_vis_implementation: return False # toggle debug visualization objects self._set_debug_vis_impl(debug_vis) # toggle debug visualization handles if debug_vis: # create a subscriber for the post update event if it doesn't exist if self._debug_vis_handle is None: app_interface = omni.kit.app.get_app_interface() self._debug_vis_handle = app_interface.get_post_update_event_stream().create_subscription_to_pop( lambda event, obj=weakref.proxy(self): obj._debug_vis_callback(event) ) else: # remove the subscriber if it exists if self._debug_vis_handle is not None: self._debug_vis_handle.unsubscribe() self._debug_vis_handle = None # return success return True
[docs] @abstractmethod def process_actions(self, actions: torch.Tensor): """Processes the actions sent to the environment. Note: This function is called once per environment step by the manager. Args: actions: The actions to process. """ raise NotImplementedError
[docs] @abstractmethod def apply_actions(self): """Applies the actions to the asset managed by the term. Note: This is called at every simulation step by the manager. """ raise NotImplementedError
def _set_debug_vis_impl(self, debug_vis: bool): """Set debug visualization into visualization objects. This function is responsible for creating the visualization objects if they don't exist and input ``debug_vis`` is True. If the visualization objects exist, the function should set their visibility into the stage. """ raise NotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.") def _debug_vis_callback(self, event): """Callback for debug visualization. This function calls the visualization objects and sets the data to visualize into them. """ raise NotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.")
[docs]class ActionManager(ManagerBase): """Manager for processing and applying actions for a given world. The action manager handles the interpretation and application of user-defined actions on a given world. It is comprised of different action terms that decide the dimension of the expected actions. The action manager performs operations at two stages: * processing of actions: It splits the input actions to each term and performs any pre-processing needed. This should be called once at every environment step. * apply actions: This operation typically sets the processed actions into the assets in the scene (such as robots). It should be called before every simulation step. """
[docs] def __init__(self, cfg: object, env: ManagerBasedEnv): """Initialize the action manager. Args: cfg: The configuration object or dictionary (``dict[str, ActionTermCfg]``). env: The environment instance. Raises: ValueError: If the configuration is None. """ # check if config is None if cfg is None: raise ValueError("Action manager configuration is None. Please provide a valid configuration.") # call the base class constructor (this prepares the terms) super().__init__(cfg, env) # create buffers to store actions self._action = torch.zeros((self.num_envs, self.total_action_dim), device=self.device) self._prev_action = torch.zeros_like(self._action) # check if any term has debug visualization implemented self.cfg.debug_vis = False for term in self._terms.values(): self.cfg.debug_vis |= term.cfg.debug_vis
def __str__(self) -> str: """Returns: A string representation for action manager.""" msg = f"<ActionManager> contains {len(self._term_names)} active terms.\n" # create table for term information table = PrettyTable() table.title = f"Active Action Terms (shape: {self.total_action_dim})" table.field_names = ["Index", "Name", "Dimension"] # set alignment of table columns table.align["Name"] = "l" table.align["Dimension"] = "r" # add info on each term for index, (name, term) in enumerate(self._terms.items()): table.add_row([index, name, term.action_dim]) # convert table to string msg += table.get_string() msg += "\n" return msg """ Properties. """ @property def total_action_dim(self) -> int: """Total dimension of actions.""" return sum(self.action_term_dim) @property def active_terms(self) -> list[str]: """Name of active action terms.""" return self._term_names @property def action_term_dim(self) -> list[int]: """Shape of each action term.""" return [term.action_dim for term in self._terms.values()] @property def action(self) -> torch.Tensor: """The actions sent to the environment. Shape is (num_envs, total_action_dim).""" return self._action @property def prev_action(self) -> torch.Tensor: """The previous actions sent to the environment. Shape is (num_envs, total_action_dim).""" return self._prev_action @property def has_debug_vis_implementation(self) -> bool: """Whether the command terms have debug visualization implemented.""" # check if function raises NotImplementedError has_debug_vis = False for term in self._terms.values(): has_debug_vis |= term.has_debug_vis_implementation return has_debug_vis """ Operations. """
[docs] def get_active_iterable_terms(self, env_idx: int) -> Sequence[tuple[str, Sequence[float]]]: """Returns the active terms as iterable sequence of tuples. The first element of the tuple is the name of the term and the second element is the raw value(s) of the term. Args: env_idx: The specific environment to pull the active terms from. Returns: The active terms. """ terms = [] idx = 0 for name, term in self._terms.items(): term_actions = self._action[env_idx, idx : idx + term.action_dim].cpu() terms.append((name, term_actions.tolist())) idx += term.action_dim return terms
[docs] def set_debug_vis(self, debug_vis: bool): """Sets whether to visualize the action data. Args: debug_vis: Whether to visualize the action data. Returns: Whether the debug visualization was successfully set. False if the action does not support debug visualization. """ for term in self._terms.values(): term.set_debug_vis(debug_vis)
[docs] def reset(self, env_ids: Sequence[int] | None = None) -> dict[str, torch.Tensor]: """Resets the action history. Args: env_ids: The environment ids. Defaults to None, in which case all environments are considered. Returns: An empty dictionary. """ # resolve environment ids if env_ids is None: env_ids = slice(None) # reset the action history self._prev_action[env_ids] = 0.0 self._action[env_ids] = 0.0 # reset all action terms for term in self._terms.values(): term.reset(env_ids=env_ids) # nothing to log here return {}
[docs] def process_action(self, action: torch.Tensor): """Processes the actions sent to the environment. Note: This function should be called once per environment step. Args: action: The actions to process. """ # check if action dimension is valid if self.total_action_dim != action.shape[1]: raise ValueError(f"Invalid action shape, expected: {self.total_action_dim}, received: {action.shape[1]}.") # store the input actions self._prev_action[:] = self._action self._action[:] = action.to(self.device) # split the actions and apply to each tensor idx = 0 for term in self._terms.values(): term_actions = action[:, idx : idx + term.action_dim] term.process_actions(term_actions) idx += term.action_dim
[docs] def apply_action(self) -> None: """Applies the actions to the environment/simulation. Note: This should be called at every simulation step. """ for term in self._terms.values(): term.apply_actions()
[docs] def get_term(self, name: str) -> ActionTerm: """Returns the action term with the specified name. Args: name: The name of the action term. Returns: The action term with the specified name. """ return self._terms[name]
""" Helper functions. """ def _prepare_terms(self): # create buffers to parse and store terms self._term_names: list[str] = list() self._terms: dict[str, ActionTerm] = dict() # check if config is dict already if isinstance(self.cfg, dict): cfg_items = self.cfg.items() else: cfg_items = self.cfg.__dict__.items() # parse action terms from the config for term_name, term_cfg in cfg_items: # check if term config is None if term_cfg is None: continue # check valid type if not isinstance(term_cfg, ActionTermCfg): raise TypeError( f"Configuration for the term '{term_name}' is not of type ActionTermCfg." f" Received: '{type(term_cfg)}'." ) # create the action term term = term_cfg.class_type(term_cfg, self._env) # sanity check if term is valid type if not isinstance(term, ActionTerm): raise TypeError(f"Returned object for the term '{term_name}' is not of type ActionType.") # add term name and parameters self._term_names.append(term_name) self._terms[term_name] = term