# Copyright (c) 2022-2025, The Isaac Lab Project Developers.# All rights reserved.## SPDX-License-Identifier: BSD-3-Clause"""Wrapper to configure an environment instance to RL-Games vectorized environment.The following example shows how to wrap an environment for RL-Games and register the environment constructionfor RL-Games :class:`Runner` class:.. code-block:: python from rl_games.common import env_configurations, vecenv from isaaclab_rl.rl_games import RlGamesGpuEnv, RlGamesVecEnvWrapper # configuration parameters rl_device = "cuda:0" clip_obs = 10.0 clip_actions = 1.0 # wrap around environment for rl-games env = RlGamesVecEnvWrapper(env, rl_device, clip_obs, clip_actions) # register the environment to rl-games registry # note: in agents configuration: environment name must be "rlgpu" vecenv.register( "IsaacRlgWrapper", lambda config_name, num_actors, **kwargs: RlGamesGpuEnv(config_name, num_actors, **kwargs) ) env_configurations.register("rlgpu", {"vecenv_type": "IsaacRlgWrapper", "env_creator": lambda **kwargs: env})"""# needed to import for allowing type-hinting:gym.spaces.Box | Nonefrom__future__importannotationsimportgym.spaces# needed for rl-games incompatibility: https://github.com/Denys88/rl_games/issues/261importgymnasiumimporttorchfromrl_games.commonimportenv_configurationsfromrl_games.common.vecenvimportIVecEnvfromisaaclab.envsimportDirectRLEnv,ManagerBasedRLEnv,VecEnvObs"""Vectorized environment wrapper."""
[docs]classRlGamesVecEnvWrapper(IVecEnv):"""Wraps around Isaac Lab environment for RL-Games. This class wraps around the Isaac Lab environment. Since RL-Games works directly on GPU buffers, the wrapper handles moving of buffers from the simulation environment to the same device as the learning agent. Additionally, it performs clipping of observations and actions. For algorithms like asymmetric actor-critic, RL-Games expects a dictionary for observations. This dictionary contains "obs" and "states" which typically correspond to the actor and critic observations respectively. To use asymmetric actor-critic, the environment observations from :class:`ManagerBasedRLEnv` or :class:`DirectRLEnv` must have the key or group name "critic". The observation group is used to set the :attr:`num_states` (int) and :attr:`state_space` (:obj:`gym.spaces.Box`). These are used by the learning agent in RL-Games to allocate buffers in the trajectory memory. Since this is optional for some environments, the wrapper checks if these attributes exist. If they don't then the wrapper defaults to zero as number of privileged observations. .. caution:: This class must be the last wrapper in the wrapper chain. This is because the wrapper does not follow the :class:`gym.Wrapper` interface. Any subsequent wrappers will need to be modified to work with this wrapper. Reference: https://github.com/Denys88/rl_games/blob/master/rl_games/common/ivecenv.py https://github.com/NVIDIA-Omniverse/IsaacGymEnvs """
[docs]def__init__(self,env:ManagerBasedRLEnv|DirectRLEnv,rl_device:str,clip_obs:float,clip_actions:float):"""Initializes the wrapper instance. Args: env: The environment to wrap around. rl_device: The device on which agent computations are performed. clip_obs: The clipping value for observations. clip_actions: The clipping value for actions. Raises: ValueError: The environment is not inherited from :class:`ManagerBasedRLEnv` or :class:`DirectRLEnv`. ValueError: If specified, the privileged observations (critic) are not of type :obj:`gym.spaces.Box`. """# check that input is validifnotisinstance(env.unwrapped,ManagerBasedRLEnv)andnotisinstance(env.unwrapped,DirectRLEnv):raiseValueError("The environment must be inherited from ManagerBasedRLEnv or DirectRLEnv. Environment type:"f" {type(env)}")# initialize the wrapperself.env=env# store provided argumentsself._rl_device=rl_deviceself._clip_obs=clip_obsself._clip_actions=clip_actionsself._sim_device=env.unwrapped.device# information for privileged observationsifself.state_spaceisNone:self.rlg_num_states=0else:self.rlg_num_states=self.state_space.shape[0]
def__str__(self):"""Returns the wrapper name and the :attr:`env` representation string."""return(f"<{type(self).__name__}{self.env}>"f"\n\tObservations clipping: {self._clip_obs}"f"\n\tActions clipping : {self._clip_actions}"f"\n\tAgent device : {self._rl_device}"f"\n\tAsymmetric-learning : {self.rlg_num_states!=0}")def__repr__(self):"""Returns the string representation of the wrapper."""returnstr(self)""" Properties -- Gym.Wrapper """@propertydefrender_mode(self)->str|None:"""Returns the :attr:`Env` :attr:`render_mode`."""returnself.env.render_mode@propertydefobservation_space(self)->gym.spaces.Box:"""Returns the :attr:`Env` :attr:`observation_space`."""# note: rl-games only wants single observation spacepolicy_obs_space=self.unwrapped.single_observation_space["policy"]ifnotisinstance(policy_obs_space,gymnasium.spaces.Box):raiseNotImplementedError(f"The RL-Games wrapper does not currently support observation space: '{type(policy_obs_space)}'."f" If you need to support this, please modify the wrapper: {self.__class__.__name__},"" and if you are nice, please send a merge-request.")# note: maybe should check if we are a sub-set of the actual space. don't do it right now since# in ManagerBasedRLEnv we are setting action space as (-inf, inf).returngym.spaces.Box(-self._clip_obs,self._clip_obs,policy_obs_space.shape)@propertydefaction_space(self)->gym.Space:"""Returns the :attr:`Env` :attr:`action_space`."""# note: rl-games only wants single action spaceaction_space=self.unwrapped.single_action_spaceifnotisinstance(action_space,gymnasium.spaces.Box):raiseNotImplementedError(f"The RL-Games wrapper does not currently support action space: '{type(action_space)}'."f" If you need to support this, please modify the wrapper: {self.__class__.__name__},"" and if you are nice, please send a merge-request.")# return casted space in gym.spaces.Box (OpenAI Gym)# note: maybe should check if we are a sub-set of the actual space. don't do it right now since# in ManagerBasedRLEnv we are setting action space as (-inf, inf).returngym.spaces.Box(-self._clip_actions,self._clip_actions,action_space.shape)
[docs]@classmethoddefclass_name(cls)->str:"""Returns the class name of the wrapper."""returncls.__name__
@propertydefunwrapped(self)->ManagerBasedRLEnv|DirectRLEnv:"""Returns the base environment of the wrapper. This will be the bare :class:`gymnasium.Env` environment, underneath all layers of wrappers. """returnself.env.unwrapped""" Properties """@propertydefnum_envs(self)->int:"""Returns the number of sub-environment instances."""returnself.unwrapped.num_envs@propertydefdevice(self)->str:"""Returns the base environment simulation device."""returnself.unwrapped.device@propertydefstate_space(self)->gym.spaces.Box|None:"""Returns the :attr:`Env` :attr:`observation_space`."""# note: rl-games only wants single observation spacecritic_obs_space=self.unwrapped.single_observation_space.get("critic")# check if we even have a critic obsifcritic_obs_spaceisNone:returnNoneelifnotisinstance(critic_obs_space,gymnasium.spaces.Box):raiseNotImplementedError(f"The RL-Games wrapper does not currently support state space: '{type(critic_obs_space)}'."f" If you need to support this, please modify the wrapper: {self.__class__.__name__},"" and if you are nice, please send a merge-request.")# return casted space in gym.spaces.Box (OpenAI Gym)# note: maybe should check if we are a sub-set of the actual space. don't do it right now since# in ManagerBasedRLEnv we are setting action space as (-inf, inf).returngym.spaces.Box(-self._clip_obs,self._clip_obs,critic_obs_space.shape)
[docs]defget_number_of_agents(self)->int:"""Returns number of actors in the environment."""returngetattr(self,"num_agents",1)
[docs]defget_env_info(self)->dict:"""Returns the Gym spaces for the environment."""return{"observation_space":self.observation_space,"action_space":self.action_space,"state_space":self.state_space,}
""" Operations - MDP """defseed(self,seed:int=-1)->int:# noqa: D102returnself.unwrapped.seed(seed)defreset(self):# noqa: D102obs_dict,_=self.env.reset()# process observations and statesreturnself._process_obs(obs_dict)defstep(self,actions):# noqa: D102# move actions to sim-deviceactions=actions.detach().clone().to(device=self._sim_device)# clip the actionsactions=torch.clamp(actions,-self._clip_actions,self._clip_actions)# perform environment stepobs_dict,rew,terminated,truncated,extras=self.env.step(actions)# move time out information to the extras dict# this is only needed for infinite horizon tasks# note: only useful when `value_bootstrap` is True in the agent configurationifnotself.unwrapped.cfg.is_finite_horizon:extras["time_outs"]=truncated.to(device=self._rl_device)# process observations and statesobs_and_states=self._process_obs(obs_dict)# move buffers to rl-device# note: we perform clone to prevent issues when rl-device and sim-device are the same.rew=rew.to(device=self._rl_device)dones=(terminated|truncated).to(device=self._rl_device)extras={k:v.to(device=self._rl_device,non_blocking=True)ifhasattr(v,"to")elsevfork,vinextras.items()}# remap extras from "log" to "episode"if"log"inextras:extras["episode"]=extras.pop("log")returnobs_and_states,rew,dones,extrasdefclose(self):# noqa: D102returnself.env.close()""" Helper functions """def_process_obs(self,obs_dict:VecEnvObs)->torch.Tensor|dict[str,torch.Tensor]:"""Processing of the observations and states from the environment. Note: States typically refers to privileged observations for the critic function. It is typically used in asymmetric actor-critic algorithms. Args: obs_dict: The current observations from environment. Returns: If environment provides states, then a dictionary containing the observations and states is returned. Otherwise just the observations tensor is returned. """# process policy obsobs=obs_dict["policy"]# clip the observationsobs=torch.clamp(obs,-self._clip_obs,self._clip_obs)# move the buffer to rl-deviceobs=obs.to(device=self._rl_device).clone()# check if asymmetric actor-critic or notifself.rlg_num_states>0:# acquire states from the environment if it existstry:states=obs_dict["critic"]exceptAttributeError:raiseNotImplementedError("Environment does not define key 'critic' for privileged observations.")# clip the statesstates=torch.clamp(states,-self._clip_obs,self._clip_obs)# move buffers to rl-devicestates=states.to(self._rl_device).clone()# convert to dictionaryreturn{"obs":obs,"states":states}else:returnobs
"""Environment Handler."""
[docs]classRlGamesGpuEnv(IVecEnv):"""Thin wrapper to create instance of the environment to fit RL-Games runner."""# TODO: Adding this for now but do we really need this?
[docs]def__init__(self,config_name:str,num_actors:int,**kwargs):"""Initialize the environment. Args: config_name: The name of the environment configuration. num_actors: The number of actors in the environment. This is not used in this wrapper. """self.env:RlGamesVecEnvWrapper=env_configurations.configurations[config_name]["env_creator"](**kwargs)
[docs]defget_number_of_agents(self)->int:"""Get number of agents in the environment. Returns: The number of agents in the environment. """returnself.env.get_number_of_agents()
[docs]defget_env_info(self)->dict:"""Get the Gym spaces for the environment. Returns: The Gym spaces for the environment. """returnself.env.get_env_info()