Source code for isaaclab.envs.manager_based_rl_env
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.# All rights reserved.## SPDX-License-Identifier: BSD-3-Clause# needed to import for allowing type-hinting: np.ndarray | Nonefrom__future__importannotationsimportgymnasiumasgymimportmathimportnumpyasnpimporttorchfromcollections.abcimportSequencefromtypingimportAny,ClassVarfromisaacsim.core.versionimportget_versionfromisaaclab.managersimportCommandManager,CurriculumManager,RewardManager,TerminationManagerfromisaaclab.ui.widgetsimportManagerLiveVisualizerfrom.commonimportVecEnvStepReturnfrom.manager_based_envimportManagerBasedEnvfrom.manager_based_rl_env_cfgimportManagerBasedRLEnvCfg
[docs]classManagerBasedRLEnv(ManagerBasedEnv,gym.Env):"""The superclass for the manager-based workflow reinforcement learning-based environments. This class inherits from :class:`ManagerBasedEnv` and implements the core functionality for reinforcement learning-based environments. It is designed to be used with any RL library. The class is designed to be used with vectorized environments, i.e., the environment is expected to be run in parallel with multiple sub-environments. The number of sub-environments is specified using the ``num_envs``. Each observation from the environment is a batch of observations for each sub- environments. The method :meth:`step` is also expected to receive a batch of actions for each sub-environment. While the environment itself is implemented as a vectorized environment, we do not inherit from :class:`gym.vector.VectorEnv`. This is mainly because the class adds various methods (for wait and asynchronous updates) which are not required. Additionally, each RL library typically has its own definition for a vectorized environment. Thus, to reduce complexity, we directly use the :class:`gym.Env` over here and leave it up to library-defined wrappers to take care of wrapping this environment for their agents. Note: For vectorized environments, it is recommended to **only** call the :meth:`reset` method once before the first call to :meth:`step`, i.e. after the environment is created. After that, the :meth:`step` function handles the reset of terminated sub-environments. This is because the simulator does not support resetting individual sub-environments in a vectorized environment. """is_vector_env:ClassVar[bool]=True"""Whether the environment is a vectorized environment."""metadata:ClassVar[dict[str,Any]]={"render_modes":[None,"human","rgb_array"],"isaac_sim_version":get_version(),}"""Metadata for the environment."""cfg:ManagerBasedRLEnvCfg"""Configuration for the environment."""
[docs]def__init__(self,cfg:ManagerBasedRLEnvCfg,render_mode:str|None=None,**kwargs):"""Initialize the environment. Args: cfg: The configuration for the environment. render_mode: The render mode for the environment. Defaults to None, which is similar to ``"human"``. """# -- counter for curriculumself.common_step_counter=0# initialize the base class to setup the scene.super().__init__(cfg=cfg)# store the render modeself.render_mode=render_mode# initialize data and constants# -- init buffersself.episode_length_buf=torch.zeros(self.num_envs,device=self.device,dtype=torch.long)# -- set the framerate of the gym video recorder wrapper so that the playback speed of the produced video matches the simulationself.metadata["render_fps"]=1/self.step_dtprint("[INFO]: Completed setting up the environment...")
""" Properties. """@propertydefmax_episode_length_s(self)->float:"""Maximum episode length in seconds."""returnself.cfg.episode_length_s@propertydefmax_episode_length(self)->int:"""Maximum episode length in environment steps."""returnmath.ceil(self.max_episode_length_s/self.step_dt)""" Operations - Setup. """
[docs]defload_managers(self):# note: this order is important since observation manager needs to know the command and action managers# and the reward manager needs to know the termination manager# -- command managerself.command_manager:CommandManager=CommandManager(self.cfg.commands,self)print("[INFO] Command Manager: ",self.command_manager)# call the parent class to load the managers for observations and actions.super().load_managers()# prepare the managers# -- termination managerself.termination_manager=TerminationManager(self.cfg.terminations,self)print("[INFO] Termination Manager: ",self.termination_manager)# -- reward managerself.reward_manager=RewardManager(self.cfg.rewards,self)print("[INFO] Reward Manager: ",self.reward_manager)# -- curriculum managerself.curriculum_manager=CurriculumManager(self.cfg.curriculum,self)print("[INFO] Curriculum Manager: ",self.curriculum_manager)# setup the action and observation spaces for Gymself._configure_gym_env_spaces()# perform events at the start of the simulationif"startup"inself.event_manager.available_modes:self.event_manager.apply(mode="startup")
[docs]defsetup_manager_visualizers(self):"""Creates live visualizers for manager terms."""self.manager_visualizers={"action_manager":ManagerLiveVisualizer(manager=self.action_manager),"observation_manager":ManagerLiveVisualizer(manager=self.observation_manager),"command_manager":ManagerLiveVisualizer(manager=self.command_manager),"termination_manager":ManagerLiveVisualizer(manager=self.termination_manager),"reward_manager":ManagerLiveVisualizer(manager=self.reward_manager),"curriculum_manager":ManagerLiveVisualizer(manager=self.curriculum_manager),}
""" Operations - MDP """
[docs]defstep(self,action:torch.Tensor)->VecEnvStepReturn:"""Execute one time-step of the environment's dynamics and reset terminated environments. Unlike the :class:`ManagerBasedEnv.step` class, the function performs the following operations: 1. Process the actions. 2. Perform physics stepping. 3. Perform rendering if gui is enabled. 4. Update the environment counters and compute the rewards and terminations. 5. Reset the environments that terminated. 6. Compute the observations. 7. Return the observations, rewards, resets and extras. Args: action: The actions to apply on the environment. Shape is (num_envs, action_dim). Returns: A tuple containing the observations, rewards, resets (terminated and truncated) and extras. """# process actionsself.action_manager.process_action(action.to(self.device))self.recorder_manager.record_pre_step()# check if we need to do rendering within the physics loop# note: checked here once to avoid multiple checks within the loopis_rendering=self.sim.has_gui()orself.sim.has_rtx_sensors()# perform physics steppingfor_inrange(self.cfg.decimation):self._sim_step_counter+=1# set actions into buffersself.action_manager.apply_action()# set actions into simulatorself.scene.write_data_to_sim()# simulateself.sim.step(render=False)# render between steps only if the GUI or an RTX sensor needs it# note: we assume the render interval to be the shortest accepted rendering interval.# If a camera needs rendering at a faster frequency, this will lead to unexpected behavior.ifself._sim_step_counter%self.cfg.sim.render_interval==0andis_rendering:self.sim.render()# update buffers at sim dtself.scene.update(dt=self.physics_dt)# post-step:# -- update env counters (used for curriculum generation)self.episode_length_buf+=1# step in current episode (per env)self.common_step_counter+=1# total step (common for all envs)# -- check terminationsself.reset_buf=self.termination_manager.compute()self.reset_terminated=self.termination_manager.terminatedself.reset_time_outs=self.termination_manager.time_outs# -- reward computationself.reward_buf=self.reward_manager.compute(dt=self.step_dt)iflen(self.recorder_manager.active_terms)>0:# update observations for recording if neededself.obs_buf=self.observation_manager.compute()self.recorder_manager.record_post_step()# -- reset envs that terminated/timed-out and log the episode informationreset_env_ids=self.reset_buf.nonzero(as_tuple=False).squeeze(-1)iflen(reset_env_ids)>0:# trigger recorder terms for pre-reset callsself.recorder_manager.record_pre_reset(reset_env_ids)self._reset_idx(reset_env_ids)# update articulation kinematicsself.scene.write_data_to_sim()self.sim.forward()# if sensors are added to the scene, make sure we render to reflect changes in resetifself.sim.has_rtx_sensors()andself.cfg.rerender_on_reset:self.sim.render()# trigger recorder terms for post-reset callsself.recorder_manager.record_post_reset(reset_env_ids)# -- update commandself.command_manager.compute(dt=self.step_dt)# -- step interval eventsif"interval"inself.event_manager.available_modes:self.event_manager.apply(mode="interval",dt=self.step_dt)# -- compute observations# note: done after reset to get the correct observations for reset envsself.obs_buf=self.observation_manager.compute()# return observations, rewards, resets and extrasreturnself.obs_buf,self.reward_buf,self.reset_terminated,self.reset_time_outs,self.extras
[docs]defrender(self,recompute:bool=False)->np.ndarray|None:"""Run rendering without stepping through the physics. By convention, if mode is: - **human**: Render to the current display and return nothing. Usually for human consumption. - **rgb_array**: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video. Args: recompute: Whether to force a render even if the simulator has already rendered the scene. Defaults to False. Returns: The rendered image as a numpy array if mode is "rgb_array". Otherwise, returns None. Raises: RuntimeError: If mode is set to "rgb_data" and simulation render mode does not support it. In this case, the simulation render mode must be set to ``RenderMode.PARTIAL_RENDERING`` or ``RenderMode.FULL_RENDERING``. NotImplementedError: If an unsupported rendering mode is specified. """# run a rendering step of the simulator# if we have rtx sensors, we do not need to render again sinifnotself.sim.has_rtx_sensors()andnotrecompute:self.sim.render()# decide the rendering modeifself.render_mode=="human"orself.render_modeisNone:returnNoneelifself.render_mode=="rgb_array":# check that if any render could have happenedifself.sim.render_mode.value<self.sim.RenderMode.PARTIAL_RENDERING.value:raiseRuntimeError(f"Cannot render '{self.render_mode}' when the simulation render mode is"f" '{self.sim.render_mode.name}'. Please set the simulation render mode to:"f"'{self.sim.RenderMode.PARTIAL_RENDERING.name}' or '{self.sim.RenderMode.FULL_RENDERING.name}'."" If running headless, make sure --enable_cameras is set.")# create the annotator if it does not existifnothasattr(self,"_rgb_annotator"):importomni.replicator.coreasrep# create render productself._render_product=rep.create.render_product(self.cfg.viewer.cam_prim_path,self.cfg.viewer.resolution)# create rgb annotator -- used to read data from the render productself._rgb_annotator=rep.AnnotatorRegistry.get_annotator("rgb",device="cpu")self._rgb_annotator.attach([self._render_product])# obtain the rgb datargb_data=self._rgb_annotator.get_data()# convert to numpy arrayrgb_data=np.frombuffer(rgb_data,dtype=np.uint8).reshape(*rgb_data.shape)# return the rgb data# note: initially the renerer is warming up and returns empty dataifrgb_data.size==0:returnnp.zeros((self.cfg.viewer.resolution[1],self.cfg.viewer.resolution[0],3),dtype=np.uint8)else:returnrgb_data[:,:,:3]else:raiseNotImplementedError(f"Render mode '{self.render_mode}' is not supported. Please use: {self.metadata['render_modes']}.")
[docs]defclose(self):ifnotself._is_closed:# destructor is order-sensitivedelself.command_managerdelself.reward_managerdelself.termination_managerdelself.curriculum_manager# call the parent class to close the environmentsuper().close()
""" Helper functions. """def_configure_gym_env_spaces(self):"""Configure the action and observation spaces for the Gym environment."""# observation space (unbounded since we don't impose any limits)self.single_observation_space=gym.spaces.Dict()forgroup_name,group_term_namesinself.observation_manager.active_terms.items():# extract quantities about the grouphas_concatenated_obs=self.observation_manager.group_obs_concatenate[group_name]group_dim=self.observation_manager.group_obs_dim[group_name]# check if group is concatenated or not# if not concatenated, then we need to add each term separately as a dictionaryifhas_concatenated_obs:self.single_observation_space[group_name]=gym.spaces.Box(low=-np.inf,high=np.inf,shape=group_dim)else:self.single_observation_space[group_name]=gym.spaces.Dict({term_name:gym.spaces.Box(low=-np.inf,high=np.inf,shape=term_dim)forterm_name,term_diminzip(group_term_names,group_dim)})# action space (unbounded since we don't impose any limits)action_dim=sum(self.action_manager.action_term_dim)self.single_action_space=gym.spaces.Box(low=-np.inf,high=np.inf,shape=(action_dim,))# batch the spaces for vectorized environmentsself.observation_space=gym.vector.utils.batch_space(self.single_observation_space,self.num_envs)self.action_space=gym.vector.utils.batch_space(self.single_action_space,self.num_envs)def_reset_idx(self,env_ids:Sequence[int]):"""Reset environments based on specified indices. Args: env_ids: List of environment ids which must be reset """# update the curriculum for environments that need a resetself.curriculum_manager.compute(env_ids=env_ids)# reset the internal buffers of the scene elementsself.scene.reset(env_ids)# apply events such as randomizations for environments that need a resetif"reset"inself.event_manager.available_modes:env_step_count=self._sim_step_counter//self.cfg.decimationself.event_manager.apply(mode="reset",env_ids=env_ids,global_env_step_count=env_step_count)# iterate over all managers and reset them# this returns a dictionary of information which is stored in the extras# note: This is order-sensitive! Certain things need be reset before others.self.extras["log"]=dict()# -- observation managerinfo=self.observation_manager.reset(env_ids)self.extras["log"].update(info)# -- action managerinfo=self.action_manager.reset(env_ids)self.extras["log"].update(info)# -- rewards managerinfo=self.reward_manager.reset(env_ids)self.extras["log"].update(info)# -- curriculum managerinfo=self.curriculum_manager.reset(env_ids)self.extras["log"].update(info)# -- command managerinfo=self.command_manager.reset(env_ids)self.extras["log"].update(info)# -- event managerinfo=self.event_manager.reset(env_ids)self.extras["log"].update(info)# -- termination managerinfo=self.termination_manager.reset(env_ids)self.extras["log"].update(info)# -- recorder managerinfo=self.recorder_manager.reset(env_ids)self.extras["log"].update(info)# reset the episode length bufferself.episode_length_buf[env_ids]=0