Source code for isaaclab.managers.recorder_manager
# Copyright (c) 2024-2025, The Isaac Lab Project Developers.# All rights reserved.## SPDX-License-Identifier: BSD-3-Clause"""Recorder manager for recording data produced from the given world."""from__future__importannotationsimportenumimportosimporttorchfromcollections.abcimportSequencefromprettytableimportPrettyTablefromtypingimportTYPE_CHECKINGfromisaaclab.utilsimportconfigclassfromisaaclab.utils.datasetsimportEpisodeData,HDF5DatasetFileHandlerfrom.manager_baseimportManagerBase,ManagerTermBasefrom.manager_term_cfgimportRecorderTermCfgifTYPE_CHECKING:fromisaaclab.envsimportManagerBasedEnvclassDatasetExportMode(enum.IntEnum):"""The mode to handle episode exports."""EXPORT_NONE=0# Export none of the episodesEXPORT_ALL=1# Export all episodes to a single dataset fileEXPORT_SUCCEEDED_FAILED_IN_SEPARATE_FILES=2# Export succeeded and failed episodes in separate filesEXPORT_SUCCEEDED_ONLY=3# Export only succeeded episodes to a single dataset file@configclassclassRecorderManagerBaseCfg:"""Base class for configuring recorder manager terms."""dataset_file_handler_class_type:type=HDF5DatasetFileHandlerdataset_export_dir_path:str="/tmp/isaaclab/logs""""The directory path where the recorded datasets are exported."""dataset_filename:str="dataset""""Dataset file name without file extension."""dataset_export_mode:DatasetExportMode=DatasetExportMode.EXPORT_ALL"""The mode to handle episode exports."""export_in_record_pre_reset:bool=True"""Whether to export episodes in the record_pre_reset call."""classRecorderTerm(ManagerTermBase):"""Base class for recorder terms. The recorder term is responsible for recording data at various stages of the environment's lifecycle. A recorder term is comprised of four user-defined callbacks to record data in the corresponding stages: * Pre-reset recording: This callback is invoked at the beginning of `env.reset()` before the reset is effective. * Post-reset recording: This callback is invoked at the end of `env.reset()`. * Pre-step recording: This callback is invoked at the beginning of `env.step()`, after the step action is processed and before the action is applied by the action manager. * Post-step recording: This callback is invoked at the end of `env.step()` when all the managers are processed. """def__init__(self,cfg:RecorderTermCfg,env:ManagerBasedEnv):"""Initialize the recorder term. Args: cfg: The configuration object. env: The environment instance. """# call the base class constructorsuper().__init__(cfg,env)""" User-defined callbacks. """defrecord_pre_reset(self,env_ids:Sequence[int]|None)->tuple[str|None,torch.Tensor|dict|None]:"""Record data at the beginning of env.reset() before reset is effective. Args: env_ids: The environment ids. All environments should be considered when set to None. Returns: A tuple of key and value to be recorded. The key can contain nested keys separated by '/'. For example, "obs/joint_pos" would add the given value under ['obs']['policy'] in the underlying dictionary in the recorded episode data. The value can be a tensor or a nested dictionary of tensors. The shape of a tensor in the value is (env_ids, ...). """returnNone,Nonedefrecord_post_reset(self,env_ids:Sequence[int]|None)->tuple[str|None,torch.Tensor|dict|None]:"""Record data at the end of env.reset(). Args: env_ids: The environment ids. All environments should be considered when set to None. Returns: A tuple of key and value to be recorded. Please refer to the `record_pre_reset` function for more details. """returnNone,Nonedefrecord_pre_step(self)->tuple[str|None,torch.Tensor|dict|None]:"""Record data in the beginning of env.step() after action is cached/processed in the ActionManager. Returns: A tuple of key and value to be recorded. Please refer to the `record_pre_reset` function for more details. """returnNone,Nonedefrecord_post_step(self)->tuple[str|None,torch.Tensor|dict|None]:"""Record data at the end of env.step() when all the managers are processed. Returns: A tuple of key and value to be recorded. Please refer to the `record_pre_reset` function for more details. """returnNone,None
[docs]classRecorderManager(ManagerBase):"""Manager for recording data from recorder terms."""
[docs]def__init__(self,cfg:object,env:ManagerBasedEnv):"""Initialize the recorder manager. Args: cfg: The configuration object or dictionary (``dict[str, RecorderTermCfg]``). env: The environment instance. """self._term_names:list[str]=list()self._terms:dict[str,RecorderTerm]=dict()# Do nothing if cfg is None or an empty dictifnotcfg:returnsuper().__init__(cfg,env)# Do nothing if no active recorder terms are providediflen(self.active_terms)==0:returnifnotisinstance(cfg,RecorderManagerBaseCfg):raiseTypeError("Configuration for the recorder manager is not of type RecorderManagerBaseCfg.")# create episode data buffer indexed by environment idself._episodes:dict[int,EpisodeData]=dict()forenv_idinrange(env.num_envs):self._episodes[env_id]=EpisodeData()env_name=getattr(env.cfg,"env_name",None)self._dataset_file_handler=Noneifcfg.dataset_export_mode!=DatasetExportMode.EXPORT_NONE:self._dataset_file_handler=cfg.dataset_file_handler_class_type()self._dataset_file_handler.create(os.path.join(cfg.dataset_export_dir_path,cfg.dataset_filename),env_name=env_name)self._failed_episode_dataset_file_handler=Noneifcfg.dataset_export_mode==DatasetExportMode.EXPORT_SUCCEEDED_FAILED_IN_SEPARATE_FILES:self._failed_episode_dataset_file_handler=cfg.dataset_file_handler_class_type()self._failed_episode_dataset_file_handler.create(os.path.join(cfg.dataset_export_dir_path,f"{cfg.dataset_filename}_failed"),env_name=env_name)self._exported_successful_episode_count={}self._exported_failed_episode_count={}
def__str__(self)->str:"""Returns: A string representation for recorder manager."""msg=f"<RecorderManager> contains {len(self._term_names)} active terms.\n"# create table for term informationtable=PrettyTable()table.title="Active Recorder Terms"table.field_names=["Index","Name"]# set alignment of table columnstable.align["Name"]="l"# add info on each termforindex,nameinenumerate(self._term_names):table.add_row([index,name])# convert table to stringmsg+=table.get_string()msg+="\n"returnmsgdef__del__(self):"""Destructor for recorder."""# Do nothing if no active recorder terms are providediflen(self.active_terms)==0:returnifself._dataset_file_handlerisnotNone:self._dataset_file_handler.close()ifself._failed_episode_dataset_file_handlerisnotNone:self._failed_episode_dataset_file_handler.close()""" Properties. """@propertydefactive_terms(self)->list[str]:"""Name of active recorder terms."""returnself._term_names@propertydefexported_successful_episode_count(self,env_id=None)->int:"""Number of successful episodes. Args: env_id: The environment id. Defaults to None, in which case all environments are considered. Returns: The number of successful episodes. """ifnothasattr(self,"_exported_successful_episode_count"):return0ifenv_idisnotNone:returnself._exported_successful_episode_count.get(env_id,0)returnsum(self._exported_successful_episode_count.values())@propertydefexported_failed_episode_count(self,env_id=None)->int:"""Number of failed episodes. Args: env_id: The environment id. Defaults to None, in which case all environments are considered. Returns: The number of failed episodes. """ifnothasattr(self,"_exported_failed_episode_count"):return0ifenv_idisnotNone:returnself._exported_failed_episode_count.get(env_id,0)returnsum(self._exported_failed_episode_count.values())""" Operations. """
[docs]defreset(self,env_ids:Sequence[int]|None=None)->dict[str,torch.Tensor]:"""Resets the recorder data. Args: env_ids: The environment ids. Defaults to None, in which case all environments are considered. Returns: An empty dictionary. """# Do nothing if no active recorder terms are providediflen(self.active_terms)==0:return{}# resolve environment idsifenv_idsisNone:env_ids=list(range(self._env.num_envs))ifisinstance(env_ids,torch.Tensor):env_ids=env_ids.tolist()forterminself._terms.values():term.reset(env_ids=env_ids)forenv_idinenv_ids:self._episodes[env_id]=EpisodeData()# nothing to log herereturn{}
[docs]defget_episode(self,env_id:int)->EpisodeData:"""Returns the episode data for the given environment id. Args: env_id: The environment id. Returns: The episode data for the given environment id. """returnself._episodes.get(env_id,EpisodeData())
[docs]defadd_to_episodes(self,key:str,value:torch.Tensor|dict,env_ids:Sequence[int]|None=None):"""Adds the given key-value pair to the episodes for the given environment ids. Args: key: The key of the given value to be added to the episodes. The key can contain nested keys separated by '/'. For example, "obs/joint_pos" would add the given value under ['obs']['policy'] in the underlying dictionary in the episode data. value: The value to be added to the episodes. The value can be a tensor or a nested dictionary of tensors. The shape of a tensor in the value is (env_ids, ...). env_ids: The environment ids. Defaults to None, in which case all environments are considered. """# Do nothing if no active recorder terms are providediflen(self.active_terms)==0:return# resolve environment idsifkeyisNone:returnifenv_idsisNone:env_ids=list(range(self._env.num_envs))ifisinstance(env_ids,torch.Tensor):env_ids=env_ids.tolist()ifisinstance(value,dict):forsub_key,sub_valueinvalue.items():self.add_to_episodes(f"{key}/{sub_key}",sub_value,env_ids)returnforvalue_index,env_idinenumerate(env_ids):ifenv_idnotinself._episodes:self._episodes[env_id]=EpisodeData()self._episodes[env_id].env_id=env_idself._episodes[env_id].add(key,value[value_index])
[docs]defset_success_to_episodes(self,env_ids:Sequence[int]|None,success_values:torch.Tensor):"""Sets the task success values to the episodes for the given environment ids. Args: env_ids: The environment ids. Defaults to None, in which case all environments are considered. success_values: The task success values to be set to the episodes. The shape of the tensor is (env_ids, 1). """# Do nothing if no active recorder terms are providediflen(self.active_terms)==0:return# resolve environment idsifenv_idsisNone:env_ids=list(range(self._env.num_envs))ifisinstance(env_ids,torch.Tensor):env_ids=env_ids.tolist()forvalue_index,env_idinenumerate(env_ids):self._episodes[env_id].success=success_values[value_index].item()
[docs]defrecord_pre_step(self)->None:"""Trigger recorder terms for pre-step functions."""# Do nothing if no active recorder terms are providediflen(self.active_terms)==0:returnforterminself._terms.values():key,value=term.record_pre_step()self.add_to_episodes(key,value)
[docs]defrecord_post_step(self)->None:"""Trigger recorder terms for post-step functions."""# Do nothing if no active recorder terms are providediflen(self.active_terms)==0:returnforterminself._terms.values():key,value=term.record_post_step()self.add_to_episodes(key,value)
[docs]defrecord_pre_reset(self,env_ids:Sequence[int]|None,force_export_or_skip=None)->None:"""Trigger recorder terms for pre-reset functions. Args: env_ids: The environment ids in which a reset is triggered. """# Do nothing if no active recorder terms are providediflen(self.active_terms)==0:returnifenv_idsisNone:env_ids=list(range(self._env.num_envs))ifisinstance(env_ids,torch.Tensor):env_ids=env_ids.tolist()forterminself._terms.values():key,value=term.record_pre_reset(env_ids)self.add_to_episodes(key,value,env_ids)# Set task success values for the relevant episodessuccess_results=torch.zeros(len(env_ids),dtype=bool,device=self._env.device)# Check success indicator from termination termsifhasattr(self._env,"termination_manager"):if"success"inself._env.termination_manager.active_terms:success_results|=self._env.termination_manager.get_term("success")[env_ids]self.set_success_to_episodes(env_ids,success_results)ifforce_export_or_skipor(force_export_or_skipisNoneandself.cfg.export_in_record_pre_reset):self.export_episodes(env_ids)
[docs]defrecord_post_reset(self,env_ids:Sequence[int]|None)->None:"""Trigger recorder terms for post-reset functions. Args: env_ids: The environment ids in which a reset is triggered. """# Do nothing if no active recorder terms are providediflen(self.active_terms)==0:returnforterminself._terms.values():key,value=term.record_post_reset(env_ids)self.add_to_episodes(key,value,env_ids)
[docs]defexport_episodes(self,env_ids:Sequence[int]|None=None)->None:"""Concludes and exports the episodes for the given environment ids. Args: env_ids: The environment ids. Defaults to None, in which case all environments are considered. """# Do nothing if no active recorder terms are providediflen(self.active_terms)==0:returnifenv_idsisNone:env_ids=list(range(self._env.num_envs))ifisinstance(env_ids,torch.Tensor):env_ids=env_ids.tolist()# Export episode data through dataset exporterneed_to_flush=Falseforenv_idinenv_ids:ifenv_idinself._episodesandnotself._episodes[env_id].is_empty():episode_succeeded=self._episodes[env_id].successtarget_dataset_file_handler=Noneif(self.cfg.dataset_export_mode==DatasetExportMode.EXPORT_ALL)or(self.cfg.dataset_export_mode==DatasetExportMode.EXPORT_SUCCEEDED_ONLYandepisode_succeeded):target_dataset_file_handler=self._dataset_file_handlerelifself.cfg.dataset_export_mode==DatasetExportMode.EXPORT_SUCCEEDED_FAILED_IN_SEPARATE_FILES:ifepisode_succeeded:target_dataset_file_handler=self._dataset_file_handlerelse:target_dataset_file_handler=self._failed_episode_dataset_file_handleriftarget_dataset_file_handlerisnotNone:target_dataset_file_handler.write_episode(self._episodes[env_id])need_to_flush=True# Update episode countifepisode_succeeded:self._exported_successful_episode_count[env_id]=(self._exported_successful_episode_count.get(env_id,0)+1)else:self._exported_failed_episode_count[env_id]=self._exported_failed_episode_count.get(env_id,0)+1# Reset the episode buffer for the given environment after exportself._episodes[env_id]=EpisodeData()ifneed_to_flush:ifself._dataset_file_handlerisnotNone:self._dataset_file_handler.flush()ifself._failed_episode_dataset_file_handlerisnotNone:self._failed_episode_dataset_file_handler.flush()
""" Helper functions. """def_prepare_terms(self):"""Prepares a list of recorder terms."""# check if config is dict alreadyifisinstance(self.cfg,dict):cfg_items=self.cfg.items()else:cfg_items=self.cfg.__dict__.items()forterm_name,term_cfgincfg_items:# skip non-term settingsifterm_namein["dataset_file_handler_class_type","dataset_filename","dataset_export_dir_path","dataset_export_mode","export_in_record_pre_reset",]:continue# check if term config is Noneifterm_cfgisNone:continue# check valid typeifnotisinstance(term_cfg,RecorderTermCfg):raiseTypeError(f"Configuration for the term '{term_name}' is not of type RecorderTermCfg."f" Received: '{type(term_cfg)}'.")# create the recorder termterm=term_cfg.class_type(term_cfg,self._env)# sanity check if term is valid typeifnotisinstance(term,RecorderTerm):raiseTypeError(f"Returned object for the term '{term_name}' is not of type RecorderTerm.")# add term name and parametersself._term_names.append(term_name)self._terms[term_name]=term