Source code for omni.isaac.lab.managers.observation_manager

# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Observation manager for computing observation signals for a given world."""

from __future__ import annotations

import inspect
import numpy as np
import torch
from collections.abc import Sequence
from prettytable import PrettyTable
from typing import TYPE_CHECKING

from omni.isaac.lab.utils import modifiers
from omni.isaac.lab.utils.buffers import CircularBuffer

from .manager_base import ManagerBase, ManagerTermBase
from .manager_term_cfg import ObservationGroupCfg, ObservationTermCfg

if TYPE_CHECKING:
    from omni.isaac.lab.envs import ManagerBasedEnv


[docs]class ObservationManager(ManagerBase): """Manager for computing observation signals for a given world. Observations are organized into groups based on their intended usage. This allows having different observation groups for different types of learning such as asymmetric actor-critic and student-teacher training. Each group contains observation terms which contain information about the observation function to call, the noise corruption model to use, and the sensor to retrieve data from. Each observation group should inherit from the :class:`ObservationGroupCfg` class. Within each group, each observation term should instantiate the :class:`ObservationTermCfg` class. Based on the configuration, the observations in a group can be concatenated into a single tensor or returned as a dictionary with keys corresponding to the term's name. If the observations in a group are concatenated, the shape of the concatenated tensor is computed based on the shapes of the individual observation terms. This information is stored in the :attr:`group_obs_dim` dictionary with keys as the group names and values as the shape of the observation tensor. When the terms in a group are not concatenated, the attribute stores a list of shapes for each term in the group. .. note:: When the observation terms in a group do not have the same shape, the observation terms cannot be concatenated. In this case, please set the :attr:`ObservationGroupCfg.concatenate_terms` attribute in the group configuration to False. Observations can also have history. This means a running history is updated per sim step. History can be controlled per :class:`ObservationTermCfg` (See the :attr:`ObservationTermCfg.history_length` and :attr:`ObservationTermCfg.flatten_history_dim`). History can also be controlled via :class:`ObservationGroupCfg` where group configuration overwrites per term configuration if set. History follows an oldest to newest ordering. The observation manager can be used to compute observations for all the groups or for a specific group. The observations are computed by calling the registered functions for each term in the group. The functions are called in the order of the terms in the group. The functions are expected to return a tensor with shape (num_envs, ...). If a noise model or custom modifier is registered for a term, the function is called to corrupt the observation. The corruption function is expected to return a tensor with the same shape as the observation. The observations are clipped and scaled as per the configuration settings. """
[docs] def __init__(self, cfg: object, env: ManagerBasedEnv): """Initialize observation manager. Args: cfg: The configuration object or dictionary (``dict[str, ObservationGroupCfg]``). env: The environment instance. Raises: ValueError: If the configuration is None. RuntimeError: If the shapes of the observation terms in a group are not compatible for concatenation and the :attr:`~ObservationGroupCfg.concatenate_terms` attribute is set to True. """ # check that cfg is not None if cfg is None: raise ValueError("Observation manager configuration is None. Please provide a valid configuration.") # call the base class constructor (this will parse the terms config) super().__init__(cfg, env) # compute combined vector for obs group self._group_obs_dim: dict[str, tuple[int, ...] | list[tuple[int, ...]]] = dict() for group_name, group_term_dims in self._group_obs_term_dim.items(): # if terms are concatenated, compute the combined shape into a single tuple # otherwise, keep the list of shapes as is if self._group_obs_concatenate[group_name]: try: term_dims = [torch.tensor(dims, device="cpu") for dims in group_term_dims] self._group_obs_dim[group_name] = tuple(torch.sum(torch.stack(term_dims, dim=0), dim=0).tolist()) except RuntimeError: raise RuntimeError( f"Unable to concatenate observation terms in group '{group_name}'." f" The shapes of the terms are: {group_term_dims}." " Please ensure that the shapes are compatible for concatenation." " Otherwise, set 'concatenate_terms' to False in the group configuration." ) else: self._group_obs_dim[group_name] = group_term_dims # Stores the latest observations. self._obs_buffer: dict[str, torch.Tensor | dict[str, torch.Tensor]] | None = None
def __str__(self) -> str: """Returns: A string representation for the observation manager.""" msg = f"<ObservationManager> contains {len(self._group_obs_term_names)} groups.\n" # add info for each group for group_name, group_dim in self._group_obs_dim.items(): # create table for term information table = PrettyTable() table.title = f"Active Observation Terms in Group: '{group_name}'" if self._group_obs_concatenate[group_name]: table.title += f" (shape: {group_dim})" table.field_names = ["Index", "Name", "Shape"] # set alignment of table columns table.align["Name"] = "l" # add info for each term obs_terms = zip( self._group_obs_term_names[group_name], self._group_obs_term_dim[group_name], ) for index, (name, dims) in enumerate(obs_terms): # resolve inputs to simplify prints tab_dims = tuple(dims) # add row table.add_row([index, name, tab_dims]) # convert table to string msg += table.get_string() msg += "\n" return msg
[docs] def get_active_iterable_terms(self, env_idx: int) -> Sequence[tuple[str, Sequence[float]]]: """Returns the active terms as iterable sequence of tuples. The first element of the tuple is the name of the term and the second element is the raw value(s) of the term. Args: env_idx: The specific environment to pull the active terms from. Returns: The active terms. """ terms = [] if self._obs_buffer is None: self.compute() obs_buffer: dict[str, torch.Tensor | dict[str, torch.Tensor]] = self._obs_buffer for group_name, _ in self._group_obs_dim.items(): if not self.group_obs_concatenate[group_name]: for name, term in obs_buffer[group_name].items(): terms.append((group_name + "-" + name, term[env_idx].cpu().tolist())) continue idx = 0 # add info for each term data = obs_buffer[group_name] for name, shape in zip( self._group_obs_term_names[group_name], self._group_obs_term_dim[group_name], ): data_length = np.prod(shape) term = data[env_idx, idx : idx + data_length] terms.append((group_name + "-" + name, term.cpu().tolist())) idx += data_length return terms
""" Properties. """ @property def active_terms(self) -> dict[str, list[str]]: """Name of active observation terms in each group. The keys are the group names and the values are the list of observation term names in the group. """ return self._group_obs_term_names @property def group_obs_dim(self) -> dict[str, tuple[int, ...] | list[tuple[int, ...]]]: """Shape of computed observations in each group. The key is the group name and the value is the shape of the observation tensor. If the terms in the group are concatenated, the value is a single tuple representing the shape of the concatenated observation tensor. Otherwise, the value is a list of tuples, where each tuple represents the shape of the observation tensor for a term in the group. """ return self._group_obs_dim @property def group_obs_term_dim(self) -> dict[str, list[tuple[int, ...]]]: """Shape of individual observation terms in each group. The key is the group name and the value is a list of tuples representing the shape of the observation terms in the group. The order of the tuples corresponds to the order of the terms in the group. This matches the order of the terms in the :attr:`active_terms`. """ return self._group_obs_term_dim @property def group_obs_concatenate(self) -> dict[str, bool]: """Whether the observation terms are concatenated in each group or not. The key is the group name and the value is a boolean specifying whether the observation terms in the group are concatenated into a single tensor. If True, the observations are concatenated along the last dimension. The values are set based on the :attr:`~ObservationGroupCfg.concatenate_terms` attribute in the group configuration. """ return self._group_obs_concatenate """ Operations. """
[docs] def reset(self, env_ids: Sequence[int] | None = None) -> dict[str, float]: # call all terms that are classes for group_name, group_cfg in self._group_obs_class_term_cfgs.items(): for term_cfg in group_cfg: term_cfg.func.reset(env_ids=env_ids) # reset terms with history for term_name in self._group_obs_term_names[group_name]: if term_name in self._group_obs_term_history_buffer[group_name]: self._group_obs_term_history_buffer[group_name][term_name].reset(batch_ids=env_ids) # call all modifiers that are classes for mod in self._group_obs_class_modifiers: mod.reset(env_ids=env_ids) # nothing to log here return {}
[docs] def compute(self) -> dict[str, torch.Tensor | dict[str, torch.Tensor]]: """Compute the observations per group for all groups. The method computes the observations for all the groups handled by the observation manager. Please check the :meth:`compute_group` on the processing of observations per group. Returns: A dictionary with keys as the group names and values as the computed observations. The observations are either concatenated into a single tensor or returned as a dictionary with keys corresponding to the term's name. """ # create a buffer for storing obs from all the groups obs_buffer = dict() # iterate over all the terms in each group for group_name in self._group_obs_term_names: obs_buffer[group_name] = self.compute_group(group_name) # otherwise return a dict with observations of all groups # Cache the observations. self._obs_buffer = obs_buffer return obs_buffer
[docs] def compute_group(self, group_name: str) -> torch.Tensor | dict[str, torch.Tensor]: """Computes the observations for a given group. The observations for a given group are computed by calling the registered functions for each term in the group. The functions are called in the order of the terms in the group. The functions are expected to return a tensor with shape (num_envs, ...). The following steps are performed for each observation term: 1. Compute observation term by calling the function 2. Apply custom modifiers in the order specified in :attr:`ObservationTermCfg.modifiers` 3. Apply corruption/noise model based on :attr:`ObservationTermCfg.noise` 4. Apply clipping based on :attr:`ObservationTermCfg.clip` 5. Apply scaling based on :attr:`ObservationTermCfg.scale` We apply noise to the computed term first to maintain the integrity of how noise affects the data as it truly exists in the real world. If the noise is applied after clipping or scaling, the noise could be artificially constrained or amplified, which might misrepresent how noise naturally occurs in the data. Args: group_name: The name of the group for which to compute the observations. Defaults to None, in which case observations for all the groups are computed and returned. Returns: Depending on the group's configuration, the tensors for individual observation terms are concatenated along the last dimension into a single tensor. Otherwise, they are returned as a dictionary with keys corresponding to the term's name. Raises: ValueError: If input ``group_name`` is not a valid group handled by the manager. """ # check ig group name is valid if group_name not in self._group_obs_term_names: raise ValueError( f"Unable to find the group '{group_name}' in the observation manager." f" Available groups are: {list(self._group_obs_term_names.keys())}" ) # iterate over all the terms in each group group_term_names = self._group_obs_term_names[group_name] # buffer to store obs per group group_obs = dict.fromkeys(group_term_names, None) # read attributes for each term obs_terms = zip(group_term_names, self._group_obs_term_cfgs[group_name]) # evaluate terms: compute, add noise, clip, scale, custom modifiers for term_name, term_cfg in obs_terms: # compute term's value obs: torch.Tensor = term_cfg.func(self._env, **term_cfg.params).clone() # apply post-processing if term_cfg.modifiers is not None: for modifier in term_cfg.modifiers: obs = modifier.func(obs, **modifier.params) if term_cfg.noise: obs = term_cfg.noise.func(obs, term_cfg.noise) if term_cfg.clip: obs = obs.clip_(min=term_cfg.clip[0], max=term_cfg.clip[1]) if term_cfg.scale is not None: obs = obs.mul_(term_cfg.scale) # Update the history buffer if observation term has history enabled if term_cfg.history_length > 0: self._group_obs_term_history_buffer[group_name][term_name].append(obs) if term_cfg.flatten_history_dim: group_obs[term_name] = self._group_obs_term_history_buffer[group_name][term_name].buffer.reshape( self._env.num_envs, -1 ) else: group_obs[term_name] = self._group_obs_term_history_buffer[group_name][term_name].buffer else: group_obs[term_name] = obs # concatenate all observations in the group together if self._group_obs_concatenate[group_name]: return torch.cat(list(group_obs.values()), dim=-1) else: return group_obs
""" Helper functions. """ def _prepare_terms(self): """Prepares a list of observation terms functions.""" # create buffers to store information for each observation group # TODO: Make this more convenient by using data structures. self._group_obs_term_names: dict[str, list[str]] = dict() self._group_obs_term_dim: dict[str, list[tuple[int, ...]]] = dict() self._group_obs_term_cfgs: dict[str, list[ObservationTermCfg]] = dict() self._group_obs_class_term_cfgs: dict[str, list[ObservationTermCfg]] = dict() self._group_obs_concatenate: dict[str, bool] = dict() self._group_obs_term_history_buffer: dict[str, dict] = dict() # create a list to store modifiers that are classes # we store it as a separate list to only call reset on them and prevent unnecessary calls self._group_obs_class_modifiers: list[modifiers.ModifierBase] = list() # check if config is dict already if isinstance(self.cfg, dict): group_cfg_items = self.cfg.items() else: group_cfg_items = self.cfg.__dict__.items() # iterate over all the groups for group_name, group_cfg in group_cfg_items: # check for non config if group_cfg is None: continue # check if the term is a curriculum term if not isinstance(group_cfg, ObservationGroupCfg): raise TypeError( f"Observation group '{group_name}' is not of type 'ObservationGroupCfg'." f" Received: '{type(group_cfg)}'." ) # initialize list for the group settings self._group_obs_term_names[group_name] = list() self._group_obs_term_dim[group_name] = list() self._group_obs_term_cfgs[group_name] = list() self._group_obs_class_term_cfgs[group_name] = list() group_entry_history_buffer: dict[str, CircularBuffer] = dict() # read common config for the group self._group_obs_concatenate[group_name] = group_cfg.concatenate_terms # check if config is dict already if isinstance(group_cfg, dict): group_cfg_items = group_cfg.items() else: group_cfg_items = group_cfg.__dict__.items() # iterate over all the terms in each group for term_name, term_cfg in group_cfg_items: # skip non-obs settings if term_name in ["enable_corruption", "concatenate_terms", "history_length", "flatten_history_dim"]: continue # check for non config if term_cfg is None: continue if not isinstance(term_cfg, ObservationTermCfg): raise TypeError( f"Configuration for the term '{term_name}' is not of type ObservationTermCfg." f" Received: '{type(term_cfg)}'." ) # resolve common terms in the config self._resolve_common_term_cfg(f"{group_name}/{term_name}", term_cfg, min_argc=1) # check noise settings if not group_cfg.enable_corruption: term_cfg.noise = None # check group history params and override terms if group_cfg.history_length is not None: term_cfg.history_length = group_cfg.history_length term_cfg.flatten_history_dim = group_cfg.flatten_history_dim # add term config to list to list self._group_obs_term_names[group_name].append(term_name) self._group_obs_term_cfgs[group_name].append(term_cfg) # call function the first time to fill up dimensions obs_dims = tuple(term_cfg.func(self._env, **term_cfg.params).shape) # create history buffers and calculate history term dimensions if term_cfg.history_length > 0: group_entry_history_buffer[term_name] = CircularBuffer( max_len=term_cfg.history_length, batch_size=self._env.num_envs, device=self._env.device ) old_dims = list(obs_dims) old_dims.insert(1, term_cfg.history_length) obs_dims = tuple(old_dims) if term_cfg.flatten_history_dim: obs_dims = (obs_dims[0], np.prod(obs_dims[1:])) self._group_obs_term_dim[group_name].append(obs_dims[1:]) # if scale is set, check if single float or tuple if term_cfg.scale is not None: if not isinstance(term_cfg.scale, (float, int, tuple)): raise TypeError( f"Scale for observation term '{term_name}' in group '{group_name}'" f" is not of type float, int or tuple. Received: '{type(term_cfg.scale)}'." ) if isinstance(term_cfg.scale, tuple) and len(term_cfg.scale) != obs_dims[1]: raise ValueError( f"Scale for observation term '{term_name}' in group '{group_name}'" f" does not match the dimensions of the observation. Expected: {obs_dims[1]}" f" but received: {len(term_cfg.scale)}." ) # cast the scale into torch tensor term_cfg.scale = torch.tensor(term_cfg.scale, dtype=torch.float, device=self._env.device) # prepare modifiers for each observation if term_cfg.modifiers is not None: # initialize list of modifiers for term for mod_cfg in term_cfg.modifiers: # check if class modifier and initialize with observation size when adding if isinstance(mod_cfg, modifiers.ModifierCfg): # to list of modifiers if inspect.isclass(mod_cfg.func): if not issubclass(mod_cfg.func, modifiers.ModifierBase): raise TypeError( f"Modifier function '{mod_cfg.func}' for observation term '{term_name}'" f" is not a subclass of 'ModifierBase'. Received: '{type(mod_cfg.func)}'." ) mod_cfg.func = mod_cfg.func(cfg=mod_cfg, data_dim=obs_dims, device=self._env.device) # add to list of class modifiers self._group_obs_class_modifiers.append(mod_cfg.func) else: raise TypeError( f"Modifier configuration '{mod_cfg}' of observation term '{term_name}' is not of" f" required type ModifierCfg, Received: '{type(mod_cfg)}'" ) # check if function is callable if not callable(mod_cfg.func): raise AttributeError( f"Modifier '{mod_cfg}' of observation term '{term_name}' is not callable." f" Received: {mod_cfg.func}" ) # check if term's arguments are matched by params term_params = list(mod_cfg.params.keys()) args = inspect.signature(mod_cfg.func).parameters args_with_defaults = [arg for arg in args if args[arg].default is not inspect.Parameter.empty] args_without_defaults = [arg for arg in args if args[arg].default is inspect.Parameter.empty] args = args_without_defaults + args_with_defaults # ignore first two arguments for env and env_ids # Think: Check for cases when kwargs are set inside the function? if len(args) > 1: if set(args[1:]) != set(term_params + args_with_defaults): raise ValueError( f"Modifier '{mod_cfg}' of observation term '{term_name}' expects" f" mandatory parameters: {args_without_defaults[1:]}" f" and optional parameters: {args_with_defaults}, but received: {term_params}." ) # add term in a separate list if term is a class if isinstance(term_cfg.func, ManagerTermBase): self._group_obs_class_term_cfgs[group_name].append(term_cfg) # call reset (in-case above call to get obs dims changed the state) term_cfg.func.reset() # add history buffers for each group self._group_obs_term_history_buffer[group_name] = group_entry_history_buffer