Source code for isaaclab.managers.observation_manager
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.# All rights reserved.## SPDX-License-Identifier: BSD-3-Clause"""Observation manager for computing observation signals for a given world."""from__future__importannotationsimportinspectimportnumpyasnpimporttorchfromcollections.abcimportSequencefromprettytableimportPrettyTablefromtypingimportTYPE_CHECKINGfromisaaclab.utilsimportmodifiersfromisaaclab.utils.buffersimportCircularBufferfrom.manager_baseimportManagerBase,ManagerTermBasefrom.manager_term_cfgimportObservationGroupCfg,ObservationTermCfgifTYPE_CHECKING:fromisaaclab.envsimportManagerBasedEnv
[docs]classObservationManager(ManagerBase):"""Manager for computing observation signals for a given world. Observations are organized into groups based on their intended usage. This allows having different observation groups for different types of learning such as asymmetric actor-critic and student-teacher training. Each group contains observation terms which contain information about the observation function to call, the noise corruption model to use, and the sensor to retrieve data from. Each observation group should inherit from the :class:`ObservationGroupCfg` class. Within each group, each observation term should instantiate the :class:`ObservationTermCfg` class. Based on the configuration, the observations in a group can be concatenated into a single tensor or returned as a dictionary with keys corresponding to the term's name. If the observations in a group are concatenated, the shape of the concatenated tensor is computed based on the shapes of the individual observation terms. This information is stored in the :attr:`group_obs_dim` dictionary with keys as the group names and values as the shape of the observation tensor. When the terms in a group are not concatenated, the attribute stores a list of shapes for each term in the group. .. note:: When the observation terms in a group do not have the same shape, the observation terms cannot be concatenated. In this case, please set the :attr:`ObservationGroupCfg.concatenate_terms` attribute in the group configuration to False. Observations can also have history. This means a running history is updated per sim step. History can be controlled per :class:`ObservationTermCfg` (See the :attr:`ObservationTermCfg.history_length` and :attr:`ObservationTermCfg.flatten_history_dim`). History can also be controlled via :class:`ObservationGroupCfg` where group configuration overwrites per term configuration if set. History follows an oldest to newest ordering. The observation manager can be used to compute observations for all the groups or for a specific group. The observations are computed by calling the registered functions for each term in the group. The functions are called in the order of the terms in the group. The functions are expected to return a tensor with shape (num_envs, ...). If a noise model or custom modifier is registered for a term, the function is called to corrupt the observation. The corruption function is expected to return a tensor with the same shape as the observation. The observations are clipped and scaled as per the configuration settings. """
[docs]def__init__(self,cfg:object,env:ManagerBasedEnv):"""Initialize observation manager. Args: cfg: The configuration object or dictionary (``dict[str, ObservationGroupCfg]``). env: The environment instance. Raises: ValueError: If the configuration is None. RuntimeError: If the shapes of the observation terms in a group are not compatible for concatenation and the :attr:`~ObservationGroupCfg.concatenate_terms` attribute is set to True. """# check that cfg is not NoneifcfgisNone:raiseValueError("Observation manager configuration is None. Please provide a valid configuration.")# call the base class constructor (this will parse the terms config)super().__init__(cfg,env)# compute combined vector for obs groupself._group_obs_dim:dict[str,tuple[int,...]|list[tuple[int,...]]]=dict()forgroup_name,group_term_dimsinself._group_obs_term_dim.items():# if terms are concatenated, compute the combined shape into a single tuple# otherwise, keep the list of shapes as isifself._group_obs_concatenate[group_name]:try:term_dims=[torch.tensor(dims,device="cpu")fordimsingroup_term_dims]self._group_obs_dim[group_name]=tuple(torch.sum(torch.stack(term_dims,dim=0),dim=0).tolist())exceptRuntimeError:raiseRuntimeError(f"Unable to concatenate observation terms in group '{group_name}'."f" The shapes of the terms are: {group_term_dims}."" Please ensure that the shapes are compatible for concatenation."" Otherwise, set 'concatenate_terms' to False in the group configuration.")else:self._group_obs_dim[group_name]=group_term_dims# Stores the latest observations.self._obs_buffer:dict[str,torch.Tensor|dict[str,torch.Tensor]]|None=None
def__str__(self)->str:"""Returns: A string representation for the observation manager."""msg=f"<ObservationManager> contains {len(self._group_obs_term_names)} groups.\n"# add info for each groupforgroup_name,group_diminself._group_obs_dim.items():# create table for term informationtable=PrettyTable()table.title=f"Active Observation Terms in Group: '{group_name}'"ifself._group_obs_concatenate[group_name]:table.title+=f" (shape: {group_dim})"table.field_names=["Index","Name","Shape"]# set alignment of table columnstable.align["Name"]="l"# add info for each termobs_terms=zip(self._group_obs_term_names[group_name],self._group_obs_term_dim[group_name],)forindex,(name,dims)inenumerate(obs_terms):# resolve inputs to simplify printstab_dims=tuple(dims)# add rowtable.add_row([index,name,tab_dims])# convert table to stringmsg+=table.get_string()msg+="\n"returnmsg
[docs]defget_active_iterable_terms(self,env_idx:int)->Sequence[tuple[str,Sequence[float]]]:"""Returns the active terms as iterable sequence of tuples. The first element of the tuple is the name of the term and the second element is the raw value(s) of the term. Args: env_idx: The specific environment to pull the active terms from. Returns: The active terms. """terms=[]ifself._obs_bufferisNone:self.compute()obs_buffer:dict[str,torch.Tensor|dict[str,torch.Tensor]]=self._obs_bufferforgroup_name,_inself._group_obs_dim.items():ifnotself.group_obs_concatenate[group_name]:forname,terminobs_buffer[group_name].items():terms.append((group_name+"-"+name,term[env_idx].cpu().tolist()))continueidx=0# add info for each termdata=obs_buffer[group_name]forname,shapeinzip(self._group_obs_term_names[group_name],self._group_obs_term_dim[group_name],):data_length=np.prod(shape)term=data[env_idx,idx:idx+data_length]terms.append((group_name+"-"+name,term.cpu().tolist()))idx+=data_lengthreturnterms
""" Properties. """@propertydefactive_terms(self)->dict[str,list[str]]:"""Name of active observation terms in each group. The keys are the group names and the values are the list of observation term names in the group. """returnself._group_obs_term_names@propertydefgroup_obs_dim(self)->dict[str,tuple[int,...]|list[tuple[int,...]]]:"""Shape of computed observations in each group. The key is the group name and the value is the shape of the observation tensor. If the terms in the group are concatenated, the value is a single tuple representing the shape of the concatenated observation tensor. Otherwise, the value is a list of tuples, where each tuple represents the shape of the observation tensor for a term in the group. """returnself._group_obs_dim@propertydefgroup_obs_term_dim(self)->dict[str,list[tuple[int,...]]]:"""Shape of individual observation terms in each group. The key is the group name and the value is a list of tuples representing the shape of the observation terms in the group. The order of the tuples corresponds to the order of the terms in the group. This matches the order of the terms in the :attr:`active_terms`. """returnself._group_obs_term_dim@propertydefgroup_obs_concatenate(self)->dict[str,bool]:"""Whether the observation terms are concatenated in each group or not. The key is the group name and the value is a boolean specifying whether the observation terms in the group are concatenated into a single tensor. If True, the observations are concatenated along the last dimension. The values are set based on the :attr:`~ObservationGroupCfg.concatenate_terms` attribute in the group configuration. """returnself._group_obs_concatenate""" Operations. """
[docs]defreset(self,env_ids:Sequence[int]|None=None)->dict[str,float]:# call all terms that are classesforgroup_name,group_cfginself._group_obs_class_term_cfgs.items():forterm_cfgingroup_cfg:term_cfg.func.reset(env_ids=env_ids)# reset terms with historyforterm_nameinself._group_obs_term_names[group_name]:ifterm_nameinself._group_obs_term_history_buffer[group_name]:self._group_obs_term_history_buffer[group_name][term_name].reset(batch_ids=env_ids)# call all modifiers that are classesformodinself._group_obs_class_modifiers:mod.reset(env_ids=env_ids)# nothing to log herereturn{}
[docs]defcompute(self)->dict[str,torch.Tensor|dict[str,torch.Tensor]]:"""Compute the observations per group for all groups. The method computes the observations for all the groups handled by the observation manager. Please check the :meth:`compute_group` on the processing of observations per group. Returns: A dictionary with keys as the group names and values as the computed observations. The observations are either concatenated into a single tensor or returned as a dictionary with keys corresponding to the term's name. """# create a buffer for storing obs from all the groupsobs_buffer=dict()# iterate over all the terms in each groupforgroup_nameinself._group_obs_term_names:obs_buffer[group_name]=self.compute_group(group_name)# otherwise return a dict with observations of all groups# Cache the observations.self._obs_buffer=obs_bufferreturnobs_buffer
[docs]defcompute_group(self,group_name:str)->torch.Tensor|dict[str,torch.Tensor]:"""Computes the observations for a given group. The observations for a given group are computed by calling the registered functions for each term in the group. The functions are called in the order of the terms in the group. The functions are expected to return a tensor with shape (num_envs, ...). The following steps are performed for each observation term: 1. Compute observation term by calling the function 2. Apply custom modifiers in the order specified in :attr:`ObservationTermCfg.modifiers` 3. Apply corruption/noise model based on :attr:`ObservationTermCfg.noise` 4. Apply clipping based on :attr:`ObservationTermCfg.clip` 5. Apply scaling based on :attr:`ObservationTermCfg.scale` We apply noise to the computed term first to maintain the integrity of how noise affects the data as it truly exists in the real world. If the noise is applied after clipping or scaling, the noise could be artificially constrained or amplified, which might misrepresent how noise naturally occurs in the data. Args: group_name: The name of the group for which to compute the observations. Defaults to None, in which case observations for all the groups are computed and returned. Returns: Depending on the group's configuration, the tensors for individual observation terms are concatenated along the last dimension into a single tensor. Otherwise, they are returned as a dictionary with keys corresponding to the term's name. Raises: ValueError: If input ``group_name`` is not a valid group handled by the manager. """# check ig group name is validifgroup_namenotinself._group_obs_term_names:raiseValueError(f"Unable to find the group '{group_name}' in the observation manager."f" Available groups are: {list(self._group_obs_term_names.keys())}")# iterate over all the terms in each groupgroup_term_names=self._group_obs_term_names[group_name]# buffer to store obs per groupgroup_obs=dict.fromkeys(group_term_names,None)# read attributes for each termobs_terms=zip(group_term_names,self._group_obs_term_cfgs[group_name])# evaluate terms: compute, add noise, clip, scale, custom modifiersforterm_name,term_cfginobs_terms:# compute term's valueobs:torch.Tensor=term_cfg.func(self._env,**term_cfg.params).clone()# apply post-processingifterm_cfg.modifiersisnotNone:formodifierinterm_cfg.modifiers:obs=modifier.func(obs,**modifier.params)ifterm_cfg.noise:obs=term_cfg.noise.func(obs,term_cfg.noise)ifterm_cfg.clip:obs=obs.clip_(min=term_cfg.clip[0],max=term_cfg.clip[1])ifterm_cfg.scaleisnotNone:obs=obs.mul_(term_cfg.scale)# Update the history buffer if observation term has history enabledifterm_cfg.history_length>0:self._group_obs_term_history_buffer[group_name][term_name].append(obs)ifterm_cfg.flatten_history_dim:group_obs[term_name]=self._group_obs_term_history_buffer[group_name][term_name].buffer.reshape(self._env.num_envs,-1)else:group_obs[term_name]=self._group_obs_term_history_buffer[group_name][term_name].bufferelse:group_obs[term_name]=obs# concatenate all observations in the group togetherifself._group_obs_concatenate[group_name]:returntorch.cat(list(group_obs.values()),dim=-1)else:returngroup_obs
""" Helper functions. """def_prepare_terms(self):"""Prepares a list of observation terms functions."""# create buffers to store information for each observation group# TODO: Make this more convenient by using data structures.self._group_obs_term_names:dict[str,list[str]]=dict()self._group_obs_term_dim:dict[str,list[tuple[int,...]]]=dict()self._group_obs_term_cfgs:dict[str,list[ObservationTermCfg]]=dict()self._group_obs_class_term_cfgs:dict[str,list[ObservationTermCfg]]=dict()self._group_obs_concatenate:dict[str,bool]=dict()self._group_obs_term_history_buffer:dict[str,dict]=dict()# create a list to store modifiers that are classes# we store it as a separate list to only call reset on them and prevent unnecessary callsself._group_obs_class_modifiers:list[modifiers.ModifierBase]=list()# make sure the simulation is playing since we compute obs dims which needs asset quantitiesifnotself._env.sim.is_playing():raiseRuntimeError("Simulation is not playing. Observation manager requires the simulation to be playing"" to compute observation dimensions. Please start the simulation before using the"" observation manager.")# check if config is dict alreadyifisinstance(self.cfg,dict):group_cfg_items=self.cfg.items()else:group_cfg_items=self.cfg.__dict__.items()# iterate over all the groupsforgroup_name,group_cfgingroup_cfg_items:# check for non configifgroup_cfgisNone:continue# check if the term is a curriculum termifnotisinstance(group_cfg,ObservationGroupCfg):raiseTypeError(f"Observation group '{group_name}' is not of type 'ObservationGroupCfg'."f" Received: '{type(group_cfg)}'.")# initialize list for the group settingsself._group_obs_term_names[group_name]=list()self._group_obs_term_dim[group_name]=list()self._group_obs_term_cfgs[group_name]=list()self._group_obs_class_term_cfgs[group_name]=list()group_entry_history_buffer:dict[str,CircularBuffer]=dict()# read common config for the groupself._group_obs_concatenate[group_name]=group_cfg.concatenate_terms# check if config is dict alreadyifisinstance(group_cfg,dict):group_cfg_items=group_cfg.items()else:group_cfg_items=group_cfg.__dict__.items()# iterate over all the terms in each groupforterm_name,term_cfgingroup_cfg_items:# skip non-obs settingsifterm_namein["enable_corruption","concatenate_terms","history_length","flatten_history_dim"]:continue# check for non configifterm_cfgisNone:continueifnotisinstance(term_cfg,ObservationTermCfg):raiseTypeError(f"Configuration for the term '{term_name}' is not of type ObservationTermCfg."f" Received: '{type(term_cfg)}'.")# resolve common terms in the configself._resolve_common_term_cfg(f"{group_name}/{term_name}",term_cfg,min_argc=1)# check noise settingsifnotgroup_cfg.enable_corruption:term_cfg.noise=None# check group history params and override termsifgroup_cfg.history_lengthisnotNone:term_cfg.history_length=group_cfg.history_lengthterm_cfg.flatten_history_dim=group_cfg.flatten_history_dim# add term config to list to listself._group_obs_term_names[group_name].append(term_name)self._group_obs_term_cfgs[group_name].append(term_cfg)# call function the first time to fill up dimensionsobs_dims=tuple(term_cfg.func(self._env,**term_cfg.params).shape)# create history buffers and calculate history term dimensionsifterm_cfg.history_length>0:group_entry_history_buffer[term_name]=CircularBuffer(max_len=term_cfg.history_length,batch_size=self._env.num_envs,device=self._env.device)old_dims=list(obs_dims)old_dims.insert(1,term_cfg.history_length)obs_dims=tuple(old_dims)ifterm_cfg.flatten_history_dim:obs_dims=(obs_dims[0],np.prod(obs_dims[1:]))self._group_obs_term_dim[group_name].append(obs_dims[1:])# if scale is set, check if single float or tupleifterm_cfg.scaleisnotNone:ifnotisinstance(term_cfg.scale,(float,int,tuple)):raiseTypeError(f"Scale for observation term '{term_name}' in group '{group_name}'"f" is not of type float, int or tuple. Received: '{type(term_cfg.scale)}'.")ifisinstance(term_cfg.scale,tuple)andlen(term_cfg.scale)!=obs_dims[1]:raiseValueError(f"Scale for observation term '{term_name}' in group '{group_name}'"f" does not match the dimensions of the observation. Expected: {obs_dims[1]}"f" but received: {len(term_cfg.scale)}.")# cast the scale into torch tensorterm_cfg.scale=torch.tensor(term_cfg.scale,dtype=torch.float,device=self._env.device)# prepare modifiers for each observationifterm_cfg.modifiersisnotNone:# initialize list of modifiers for termformod_cfginterm_cfg.modifiers:# check if class modifier and initialize with observation size when addingifisinstance(mod_cfg,modifiers.ModifierCfg):# to list of modifiersifinspect.isclass(mod_cfg.func):ifnotissubclass(mod_cfg.func,modifiers.ModifierBase):raiseTypeError(f"Modifier function '{mod_cfg.func}' for observation term '{term_name}'"f" is not a subclass of 'ModifierBase'. Received: '{type(mod_cfg.func)}'.")mod_cfg.func=mod_cfg.func(cfg=mod_cfg,data_dim=obs_dims,device=self._env.device)# add to list of class modifiersself._group_obs_class_modifiers.append(mod_cfg.func)else:raiseTypeError(f"Modifier configuration '{mod_cfg}' of observation term '{term_name}' is not of"f" required type ModifierCfg, Received: '{type(mod_cfg)}'")# check if function is callableifnotcallable(mod_cfg.func):raiseAttributeError(f"Modifier '{mod_cfg}' of observation term '{term_name}' is not callable."f" Received: {mod_cfg.func}")# check if term's arguments are matched by paramsterm_params=list(mod_cfg.params.keys())args=inspect.signature(mod_cfg.func).parametersargs_with_defaults=[argforarginargsifargs[arg].defaultisnotinspect.Parameter.empty]args_without_defaults=[argforarginargsifargs[arg].defaultisinspect.Parameter.empty]args=args_without_defaults+args_with_defaults# ignore first two arguments for env and env_ids# Think: Check for cases when kwargs are set inside the function?iflen(args)>1:ifset(args[1:])!=set(term_params+args_with_defaults):raiseValueError(f"Modifier '{mod_cfg}' of observation term '{term_name}' expects"f" mandatory parameters: {args_without_defaults[1:]}"f" and optional parameters: {args_with_defaults}, but received: {term_params}.")# add term in a separate list if term is a classifisinstance(term_cfg.func,ManagerTermBase):self._group_obs_class_term_cfgs[group_name].append(term_cfg)# call reset (in-case above call to get obs dims changed the state)term_cfg.func.reset()# add history buffers for each groupself._group_obs_term_history_buffer[group_name]=group_entry_history_buffer