# Copyright (c) 2022-2025, The Isaac Lab Project Developers.# All rights reserved.## SPDX-License-Identifier: BSD-3-Clauseimportgymnasiumasgymimporttorchfromrsl_rl.envimportVecEnvfromisaaclab.envsimportDirectRLEnv,ManagerBasedRLEnv
[docs]classRslRlVecEnvWrapper(VecEnv):"""Wraps around Isaac Lab environment for RSL-RL library To use asymmetric actor-critic, the environment instance must have the attributes :attr:`num_privileged_obs` (int). This is used by the learning agent to allocate buffers in the trajectory memory. Additionally, the returned observations should have the key "critic" which corresponds to the privileged observations. Since this is optional for some environments, the wrapper checks if these attributes exist. If they don't then the wrapper defaults to zero as number of privileged observations. .. caution:: This class must be the last wrapper in the wrapper chain. This is because the wrapper does not follow the :class:`gym.Wrapper` interface. Any subsequent wrappers will need to be modified to work with this wrapper. Reference: https://github.com/leggedrobotics/rsl_rl/blob/master/rsl_rl/env/vec_env.py """
[docs]def__init__(self,env:ManagerBasedRLEnv|DirectRLEnv,clip_actions:float|None=None):"""Initializes the wrapper. Note: The wrapper calls :meth:`reset` at the start since the RSL-RL runner does not call reset. Args: env: The environment to wrap around. clip_actions: The clipping value for actions. If ``None``, then no clipping is done. Raises: ValueError: When the environment is not an instance of :class:`ManagerBasedRLEnv` or :class:`DirectRLEnv`. """# check that input is validifnotisinstance(env.unwrapped,ManagerBasedRLEnv)andnotisinstance(env.unwrapped,DirectRLEnv):raiseValueError("The environment must be inherited from ManagerBasedRLEnv or DirectRLEnv. Environment type:"f" {type(env)}")# initialize the wrapperself.env=envself.clip_actions=clip_actions# store information required by wrapperself.num_envs=self.unwrapped.num_envsself.device=self.unwrapped.deviceself.max_episode_length=self.unwrapped.max_episode_length# obtain dimensions of the environmentifhasattr(self.unwrapped,"action_manager"):self.num_actions=self.unwrapped.action_manager.total_action_dimelse:self.num_actions=gym.spaces.flatdim(self.unwrapped.single_action_space)ifhasattr(self.unwrapped,"observation_manager"):self.num_obs=self.unwrapped.observation_manager.group_obs_dim["policy"][0]else:self.num_obs=gym.spaces.flatdim(self.unwrapped.single_observation_space["policy"])# -- privileged observationsif(hasattr(self.unwrapped,"observation_manager")and"critic"inself.unwrapped.observation_manager.group_obs_dim):self.num_privileged_obs=self.unwrapped.observation_manager.group_obs_dim["critic"][0]elifhasattr(self.unwrapped,"num_states")and"critic"inself.unwrapped.single_observation_space:self.num_privileged_obs=gym.spaces.flatdim(self.unwrapped.single_observation_space["critic"])else:self.num_privileged_obs=0# modify the action space to the clip rangeself._modify_action_space()# reset at the start since the RSL-RL runner does not call resetself.env.reset()
def__str__(self):"""Returns the wrapper name and the :attr:`env` representation string."""returnf"<{type(self).__name__}{self.env}>"def__repr__(self):"""Returns the string representation of the wrapper."""returnstr(self)""" Properties -- Gym.Wrapper """@propertydefcfg(self)->object:"""Returns the configuration class instance of the environment."""returnself.unwrapped.cfg@propertydefrender_mode(self)->str|None:"""Returns the :attr:`Env` :attr:`render_mode`."""returnself.env.render_mode@propertydefobservation_space(self)->gym.Space:"""Returns the :attr:`Env` :attr:`observation_space`."""returnself.env.observation_space@propertydefaction_space(self)->gym.Space:"""Returns the :attr:`Env` :attr:`action_space`."""returnself.env.action_space
[docs]@classmethoddefclass_name(cls)->str:"""Returns the class name of the wrapper."""returncls.__name__
@propertydefunwrapped(self)->ManagerBasedRLEnv|DirectRLEnv:"""Returns the base environment of the wrapper. This will be the bare :class:`gymnasium.Env` environment, underneath all layers of wrappers. """returnself.env.unwrapped""" Properties """
[docs]defget_observations(self)->tuple[torch.Tensor,dict]:"""Returns the current observations of the environment."""ifhasattr(self.unwrapped,"observation_manager"):obs_dict=self.unwrapped.observation_manager.compute()else:obs_dict=self.unwrapped._get_observations()returnobs_dict["policy"],{"observations":obs_dict}
@propertydefepisode_length_buf(self)->torch.Tensor:"""The episode length buffer."""returnself.unwrapped.episode_length_buf@episode_length_buf.setterdefepisode_length_buf(self,value:torch.Tensor):"""Set the episode length buffer. Note: This is needed to perform random initialization of episode lengths in RSL-RL. """self.unwrapped.episode_length_buf=value""" Operations - MDP """defseed(self,seed:int=-1)->int:# noqa: D102returnself.unwrapped.seed(seed)defreset(self)->tuple[torch.Tensor,dict]:# noqa: D102# reset the environmentobs_dict,_=self.env.reset()# return observationsreturnobs_dict["policy"],{"observations":obs_dict}defstep(self,actions:torch.Tensor)->tuple[torch.Tensor,torch.Tensor,torch.Tensor,dict]:# clip actionsifself.clip_actionsisnotNone:actions=torch.clamp(actions,-self.clip_actions,self.clip_actions)# record step informationobs_dict,rew,terminated,truncated,extras=self.env.step(actions)# compute dones for compatibility with RSL-RLdones=(terminated|truncated).to(dtype=torch.long)# move extra observations to the extras dictobs=obs_dict["policy"]extras["observations"]=obs_dict# move time out information to the extras dict# this is only needed for infinite horizon tasksifnotself.unwrapped.cfg.is_finite_horizon:extras["time_outs"]=truncated# return the step informationreturnobs,rew,dones,extrasdefclose(self):# noqa: D102returnself.env.close()""" Helper functions """def_modify_action_space(self):"""Modifies the action space to the clip range."""ifself.clip_actionsisNone:return# modify the action space to the clip range# note: this is only possible for the box action space. we need to change it in the future for other action spaces.self.env.unwrapped.single_action_space=gym.spaces.Box(low=-self.clip_actions,high=self.clip_actions,shape=(self.num_actions,))self.env.unwrapped.action_space=gym.vector.utils.batch_space(self.env.unwrapped.single_action_space,self.num_envs)