Source code for isaaclab.managers.curriculum_manager
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.# All rights reserved.## SPDX-License-Identifier: BSD-3-Clause"""Curriculum manager for updating environment quantities subject to a training curriculum."""from__future__importannotationsimporttorchfromcollections.abcimportSequencefromprettytableimportPrettyTablefromtypingimportTYPE_CHECKINGfrom.manager_baseimportManagerBase,ManagerTermBasefrom.manager_term_cfgimportCurriculumTermCfgifTYPE_CHECKING:fromisaaclab.envsimportManagerBasedRLEnv
[docs]classCurriculumManager(ManagerBase):"""Manager to implement and execute specific curricula. The curriculum manager updates various quantities of the environment subject to a training curriculum by calling a list of terms. These help stabilize learning by progressively making the learning tasks harder as the agent improves. The curriculum terms are parsed from a config class containing the manager's settings and each term's parameters. Each curriculum term should instantiate the :class:`CurriculumTermCfg` class. """_env:ManagerBasedRLEnv"""The environment instance."""
[docs]def__init__(self,cfg:object,env:ManagerBasedRLEnv):"""Initialize the manager. Args: cfg: The configuration object or dictionary (``dict[str, CurriculumTermCfg]``) env: An environment object. Raises: TypeError: If curriculum term is not of type :class:`CurriculumTermCfg`. ValueError: If curriculum term configuration does not satisfy its function signature. """# create buffers to parse and store termsself._term_names:list[str]=list()self._term_cfgs:list[CurriculumTermCfg]=list()self._class_term_cfgs:list[CurriculumTermCfg]=list()# call the base class constructor (this will parse the terms config)super().__init__(cfg,env)# prepare loggingself._curriculum_state=dict()forterm_nameinself._term_names:self._curriculum_state[term_name]=None
def__str__(self)->str:"""Returns: A string representation for curriculum manager."""msg=f"<CurriculumManager> contains {len(self._term_names)} active terms.\n"# create table for term informationtable=PrettyTable()table.title="Active Curriculum Terms"table.field_names=["Index","Name"]# set alignment of table columnstable.align["Name"]="l"# add info on each termforindex,nameinenumerate(self._term_names):table.add_row([index,name])# convert table to stringmsg+=table.get_string()msg+="\n"returnmsg""" Properties. """@propertydefactive_terms(self)->list[str]:"""Name of active curriculum terms."""returnself._term_names""" Operations. """
[docs]defreset(self,env_ids:Sequence[int]|None=None)->dict[str,float]:"""Returns the current state of individual curriculum terms. Note: This function does not use the environment indices :attr:`env_ids` and logs the state of all the terms. The argument is only present to maintain consistency with other classes. Returns: Dictionary of curriculum terms and their states. """extras={}forterm_name,term_stateinself._curriculum_state.items():ifterm_stateisnotNone:# deal with dictifisinstance(term_state,dict):# each key is a separate state to logforkey,valueinterm_state.items():ifisinstance(value,torch.Tensor):value=value.item()extras[f"Curriculum/{term_name}/{key}"]=valueelse:# log directly if not a dictifisinstance(term_state,torch.Tensor):term_state=term_state.item()extras[f"Curriculum/{term_name}"]=term_state# reset all the curriculum termsforterm_cfginself._class_term_cfgs:term_cfg.func.reset(env_ids=env_ids)# return logged informationreturnextras
[docs]defcompute(self,env_ids:Sequence[int]|None=None):"""Update the curriculum terms. This function calls each curriculum term managed by the class. Args: env_ids: The list of environment IDs to update. If None, all the environments are updated. Defaults to None. """# resolve environment indicesifenv_idsisNone:env_ids=slice(None)# iterate over all the curriculum termsforname,term_cfginzip(self._term_names,self._term_cfgs):state=term_cfg.func(self._env,env_ids,**term_cfg.params)self._curriculum_state[name]=state
[docs]defget_active_iterable_terms(self,env_idx:int)->Sequence[tuple[str,Sequence[float]]]:"""Returns the active terms as iterable sequence of tuples. The first element of the tuple is the name of the term and the second element is the raw value(s) of the term. Args: env_idx: The specific environment to pull the active terms from. Returns: The active terms. """terms=[]forterm_name,term_stateinself._curriculum_state.items():ifterm_stateisnotNone:# deal with dictdata=[]ifisinstance(term_state,dict):# each key is a separate state to logforkey,valueinterm_state.items():ifisinstance(value,torch.Tensor):value=value.item()terms[term_name].append(value)else:# log directly if not a dictifisinstance(term_state,torch.Tensor):term_state=term_state.item()data.append(term_state)terms.append((term_name,data))returnterms
""" Helper functions. """def_prepare_terms(self):# check if config is dict alreadyifisinstance(self.cfg,dict):cfg_items=self.cfg.items()else:cfg_items=self.cfg.__dict__.items()# iterate over all the termsforterm_name,term_cfgincfg_items:# check for non configifterm_cfgisNone:continue# check if the term is a valid term configifnotisinstance(term_cfg,CurriculumTermCfg):raiseTypeError(f"Configuration for the term '{term_name}' is not of type CurriculumTermCfg."f" Received: '{type(term_cfg)}'.")# resolve common parametersself._resolve_common_term_cfg(term_name,term_cfg,min_argc=2)# add name and config to listself._term_names.append(term_name)self._term_cfgs.append(term_cfg)# check if the term is a classifisinstance(term_cfg.func,ManagerTermBase):self._class_term_cfgs.append(term_cfg)