# Copyright (c) 2022-2025, The Isaac Lab Project Developers.# All rights reserved.## SPDX-License-Identifier: BSD-3-Clause"""Action manager for processing actions sent to the environment."""from__future__importannotationsimportinspectimporttorchimportweakreffromabcimportabstractmethodfromcollections.abcimportSequencefromprettytableimportPrettyTablefromtypingimportTYPE_CHECKINGimportomni.kit.appfromisaaclab.assetsimportAssetBasefrom.manager_baseimportManagerBase,ManagerTermBasefrom.manager_term_cfgimportActionTermCfgifTYPE_CHECKING:fromisaaclab.envsimportManagerBasedEnv
[docs]classActionTerm(ManagerTermBase):"""Base class for action terms. The action term is responsible for processing the raw actions sent to the environment and applying them to the asset managed by the term. The action term is comprised of two operations: * Processing of actions: This operation is performed once per **environment step** and is responsible for pre-processing the raw actions sent to the environment. * Applying actions: This operation is performed once per **simulation step** and is responsible for applying the processed actions to the asset managed by the term. """
[docs]def__init__(self,cfg:ActionTermCfg,env:ManagerBasedEnv):"""Initialize the action term. Args: cfg: The configuration object. env: The environment instance. """# call the base class constructorsuper().__init__(cfg,env)# parse config to obtain asset to which the term is appliedself._asset:AssetBase=self._env.scene[self.cfg.asset_name]# add handle for debug visualization (this is set to a valid handle inside set_debug_vis)self._debug_vis_handle=None# set initial state of debug visualizationself.set_debug_vis(self.cfg.debug_vis)
def__del__(self):"""Unsubscribe from the callbacks."""ifself._debug_vis_handle:self._debug_vis_handle.unsubscribe()self._debug_vis_handle=None""" Properties. """@property@abstractmethoddefaction_dim(self)->int:"""Dimension of the action term."""raiseNotImplementedError@property@abstractmethoddefraw_actions(self)->torch.Tensor:"""The input/raw actions sent to the term."""raiseNotImplementedError@property@abstractmethoddefprocessed_actions(self)->torch.Tensor:"""The actions computed by the term after applying any processing."""raiseNotImplementedError@propertydefhas_debug_vis_implementation(self)->bool:"""Whether the action term has a debug visualization implemented."""# check if function raises NotImplementedErrorsource_code=inspect.getsource(self._set_debug_vis_impl)return"NotImplementedError"notinsource_code""" Operations. """
[docs]defset_debug_vis(self,debug_vis:bool)->bool:"""Sets whether to visualize the action term data. Args: debug_vis: Whether to visualize the action term data. Returns: Whether the debug visualization was successfully set. False if the action term does not support debug visualization. """# check if debug visualization is supportedifnotself.has_debug_vis_implementation:returnFalse# toggle debug visualization objectsself._set_debug_vis_impl(debug_vis)# toggle debug visualization handlesifdebug_vis:# create a subscriber for the post update event if it doesn't existifself._debug_vis_handleisNone:app_interface=omni.kit.app.get_app_interface()self._debug_vis_handle=app_interface.get_post_update_event_stream().create_subscription_to_pop(lambdaevent,obj=weakref.proxy(self):obj._debug_vis_callback(event))else:# remove the subscriber if it existsifself._debug_vis_handleisnotNone:self._debug_vis_handle.unsubscribe()self._debug_vis_handle=None# return successreturnTrue
[docs]@abstractmethoddefprocess_actions(self,actions:torch.Tensor):"""Processes the actions sent to the environment. Note: This function is called once per environment step by the manager. Args: actions: The actions to process. """raiseNotImplementedError
[docs]@abstractmethoddefapply_actions(self):"""Applies the actions to the asset managed by the term. Note: This is called at every simulation step by the manager. """raiseNotImplementedError
def_set_debug_vis_impl(self,debug_vis:bool):"""Set debug visualization into visualization objects. This function is responsible for creating the visualization objects if they don't exist and input ``debug_vis`` is True. If the visualization objects exist, the function should set their visibility into the stage. """raiseNotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.")def_debug_vis_callback(self,event):"""Callback for debug visualization. This function calls the visualization objects and sets the data to visualize into them. """raiseNotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.")
[docs]classActionManager(ManagerBase):"""Manager for processing and applying actions for a given world. The action manager handles the interpretation and application of user-defined actions on a given world. It is comprised of different action terms that decide the dimension of the expected actions. The action manager performs operations at two stages: * processing of actions: It splits the input actions to each term and performs any pre-processing needed. This should be called once at every environment step. * apply actions: This operation typically sets the processed actions into the assets in the scene (such as robots). It should be called before every simulation step. """
[docs]def__init__(self,cfg:object,env:ManagerBasedEnv):"""Initialize the action manager. Args: cfg: The configuration object or dictionary (``dict[str, ActionTermCfg]``). env: The environment instance. Raises: ValueError: If the configuration is None. """# check if config is NoneifcfgisNone:raiseValueError("Action manager configuration is None. Please provide a valid configuration.")# call the base class constructor (this prepares the terms)super().__init__(cfg,env)# create buffers to store actionsself._action=torch.zeros((self.num_envs,self.total_action_dim),device=self.device)self._prev_action=torch.zeros_like(self._action)# check if any term has debug visualization implementedself.cfg.debug_vis=Falseforterminself._terms.values():self.cfg.debug_vis|=term.cfg.debug_vis
def__str__(self)->str:"""Returns: A string representation for action manager."""msg=f"<ActionManager> contains {len(self._term_names)} active terms.\n"# create table for term informationtable=PrettyTable()table.title=f"Active Action Terms (shape: {self.total_action_dim})"table.field_names=["Index","Name","Dimension"]# set alignment of table columnstable.align["Name"]="l"table.align["Dimension"]="r"# add info on each termforindex,(name,term)inenumerate(self._terms.items()):table.add_row([index,name,term.action_dim])# convert table to stringmsg+=table.get_string()msg+="\n"returnmsg""" Properties. """@propertydeftotal_action_dim(self)->int:"""Total dimension of actions."""returnsum(self.action_term_dim)@propertydefactive_terms(self)->list[str]:"""Name of active action terms."""returnself._term_names@propertydefaction_term_dim(self)->list[int]:"""Shape of each action term."""return[term.action_dimforterminself._terms.values()]@propertydefaction(self)->torch.Tensor:"""The actions sent to the environment. Shape is (num_envs, total_action_dim)."""returnself._action@propertydefprev_action(self)->torch.Tensor:"""The previous actions sent to the environment. Shape is (num_envs, total_action_dim)."""returnself._prev_action@propertydefhas_debug_vis_implementation(self)->bool:"""Whether the command terms have debug visualization implemented."""# check if function raises NotImplementedErrorhas_debug_vis=Falseforterminself._terms.values():has_debug_vis|=term.has_debug_vis_implementationreturnhas_debug_vis""" Operations. """
[docs]defget_active_iterable_terms(self,env_idx:int)->Sequence[tuple[str,Sequence[float]]]:"""Returns the active terms as iterable sequence of tuples. The first element of the tuple is the name of the term and the second element is the raw value(s) of the term. Args: env_idx: The specific environment to pull the active terms from. Returns: The active terms. """terms=[]idx=0forname,terminself._terms.items():term_actions=self._action[env_idx,idx:idx+term.action_dim].cpu()terms.append((name,term_actions.tolist()))idx+=term.action_dimreturnterms
[docs]defset_debug_vis(self,debug_vis:bool):"""Sets whether to visualize the action data. Args: debug_vis: Whether to visualize the action data. Returns: Whether the debug visualization was successfully set. False if the action does not support debug visualization. """forterminself._terms.values():term.set_debug_vis(debug_vis)
[docs]defreset(self,env_ids:Sequence[int]|None=None)->dict[str,torch.Tensor]:"""Resets the action history. Args: env_ids: The environment ids. Defaults to None, in which case all environments are considered. Returns: An empty dictionary. """# resolve environment idsifenv_idsisNone:env_ids=slice(None)# reset the action historyself._prev_action[env_ids]=0.0self._action[env_ids]=0.0# reset all action termsforterminself._terms.values():term.reset(env_ids=env_ids)# nothing to log herereturn{}
[docs]defprocess_action(self,action:torch.Tensor):"""Processes the actions sent to the environment. Note: This function should be called once per environment step. Args: action: The actions to process. """# check if action dimension is validifself.total_action_dim!=action.shape[1]:raiseValueError(f"Invalid action shape, expected: {self.total_action_dim}, received: {action.shape[1]}.")# store the input actionsself._prev_action[:]=self._actionself._action[:]=action.to(self.device)# split the actions and apply to each tensoridx=0forterminself._terms.values():term_actions=action[:,idx:idx+term.action_dim]term.process_actions(term_actions)idx+=term.action_dim
[docs]defapply_action(self)->None:"""Applies the actions to the environment/simulation. Note: This should be called at every simulation step. """forterminself._terms.values():term.apply_actions()
[docs]defget_term(self,name:str)->ActionTerm:"""Returns the action term with the specified name. Args: name: The name of the action term. Returns: The action term with the specified name. """returnself._terms[name]
""" Helper functions. """def_prepare_terms(self):# create buffers to parse and store termsself._term_names:list[str]=list()self._terms:dict[str,ActionTerm]=dict()# check if config is dict alreadyifisinstance(self.cfg,dict):cfg_items=self.cfg.items()else:cfg_items=self.cfg.__dict__.items()# parse action terms from the configforterm_name,term_cfgincfg_items:# check if term config is Noneifterm_cfgisNone:continue# check valid typeifnotisinstance(term_cfg,ActionTermCfg):raiseTypeError(f"Configuration for the term '{term_name}' is not of type ActionTermCfg."f" Received: '{type(term_cfg)}'.")# create the action termterm=term_cfg.class_type(term_cfg,self._env)# sanity check if term is valid typeifnotisinstance(term,ActionTerm):raiseTypeError(f"Returned object for the term '{term_name}' is not of type ActionType.")# add term name and parametersself._term_names.append(term_name)self._terms[term_name]=term