# Copyright (c) 2022-2025, The Isaac Lab Project Developers.# All rights reserved.## SPDX-License-Identifier: BSD-3-Clausefrom__future__importannotationsimportbuiltinsimportgymnasiumasgymimportinspectimportmathimportnumpyasnpimporttorchimportweakreffromabcimportabstractmethodfromcollections.abcimportSequencefromdataclassesimportMISSINGfromtypingimportAny,ClassVarimportisaacsim.core.utils.torchastorch_utilsimportomni.kit.appimportomni.logfromisaacsim.core.simulation_managerimportSimulationManagerfromisaacsim.core.versionimportget_versionfromisaaclab.managersimportEventManagerfromisaaclab.sceneimportInteractiveScenefromisaaclab.simimportSimulationContextfromisaaclab.utils.noiseimportNoiseModelfromisaaclab.utils.timerimportTimerfrom.commonimportVecEnvObs,VecEnvStepReturnfrom.direct_rl_env_cfgimportDirectRLEnvCfgfrom.uiimportViewportCameraControllerfrom.utils.spacesimportsample_space,spec_to_gym_space
[docs]classDirectRLEnv(gym.Env):"""The superclass for the direct workflow to design environments. This class implements the core functionality for reinforcement learning (RL) environments. It is designed to be used with any RL library. The class is designed to be used with vectorized environments, i.e., the environment is expected to be run in parallel with multiple sub-environments. While the environment itself is implemented as a vectorized environment, we do not inherit from :class:`gym.vector.VectorEnv`. This is mainly because the class adds various methods (for wait and asynchronous updates) which are not required. Additionally, each RL library typically has its own definition for a vectorized environment. Thus, to reduce complexity, we directly use the :class:`gym.Env` over here and leave it up to library-defined wrappers to take care of wrapping this environment for their agents. Note: For vectorized environments, it is recommended to **only** call the :meth:`reset` method once before the first call to :meth:`step`, i.e. after the environment is created. After that, the :meth:`step` function handles the reset of terminated sub-environments. This is because the simulator does not support resetting individual sub-environments in a vectorized environment. """is_vector_env:ClassVar[bool]=True"""Whether the environment is a vectorized environment."""metadata:ClassVar[dict[str,Any]]={"render_modes":[None,"human","rgb_array"],"isaac_sim_version":get_version(),}"""Metadata for the environment."""
[docs]def__init__(self,cfg:DirectRLEnvCfg,render_mode:str|None=None,**kwargs):"""Initialize the environment. Args: cfg: The configuration object for the environment. render_mode: The render mode for the environment. Defaults to None, which is similar to ``"human"``. Raises: RuntimeError: If a simulation context already exists. The environment must always create one since it configures the simulation context and controls the simulation. """# check that the config is validcfg.validate()# store inputs to classself.cfg=cfg# store the render modeself.render_mode=render_mode# initialize internal variablesself._is_closed=False# set the seed for the environmentifself.cfg.seedisnotNone:self.cfg.seed=self.seed(self.cfg.seed)else:omni.log.warn("Seed not set for the environment. The environment creation may not be deterministic.")# create a simulation context to control the simulatorifSimulationContext.instance()isNone:self.sim:SimulationContext=SimulationContext(self.cfg.sim)else:raiseRuntimeError("Simulation context already exists. Cannot create a new one.")# make sure torch is running on the correct deviceif"cuda"inself.device:torch.cuda.set_device(self.device)# print useful informationprint("[INFO]: Base environment:")print(f"\tEnvironment device : {self.device}")print(f"\tEnvironment seed : {self.cfg.seed}")print(f"\tPhysics step-size : {self.physics_dt}")print(f"\tRendering step-size : {self.physics_dt*self.cfg.sim.render_interval}")print(f"\tEnvironment step-size : {self.step_dt}")ifself.cfg.sim.render_interval<self.cfg.decimation:msg=(f"The render interval ({self.cfg.sim.render_interval}) is smaller than the decimation "f"({self.cfg.decimation}). Multiple render calls will happen for each environment step.""If this is not intended, set the render interval to be equal to the decimation.")omni.log.warn(msg)# generate scenewithTimer("[INFO]: Time taken for scene creation","scene_creation"):self.scene=InteractiveScene(self.cfg.scene)self._setup_scene()print("[INFO]: Scene manager: ",self.scene)# set up camera viewport controller# viewport is not available in other rendering modes so the function will throw a warning# FIXME: This needs to be fixed in the future when we unify the UI functionalities even for# non-rendering modes.ifself.sim.render_mode>=self.sim.RenderMode.PARTIAL_RENDERING:self.viewport_camera_controller=ViewportCameraController(self,self.cfg.viewer)else:self.viewport_camera_controller=None# create event manager# note: this is needed here (rather than after simulation play) to allow USD-related randomization events# that must happen before the simulation starts. Example: randomizing mesh scaleifself.cfg.events:self.event_manager=EventManager(self.cfg.events,self)# apply USD-related randomization eventsif"prestartup"inself.event_manager.available_modes:self.event_manager.apply(mode="prestartup")# play the simulator to activate physics handles# note: this activates the physics simulation view that exposes TensorAPIs# note: when started in extension mode, first call sim.reset_async() and then initialize the managersifbuiltins.ISAAC_LAUNCHED_FROM_TERMINALisFalse:print("[INFO]: Starting the simulation. This may take a few seconds. Please wait...")withTimer("[INFO]: Time taken for simulation start","simulation_start"):self.sim.reset()# update scene to pre populate data buffers for assets and sensors.# this is needed for the observation manager to get valid tensors for initialization.# this shouldn't cause an issue since later on, users do a reset over all the environments so the lazy buffers would be reset.self.scene.update(dt=self.physics_dt)# check if debug visualization is has been implemented by the environmentsource_code=inspect.getsource(self._set_debug_vis_impl)self.has_debug_vis_implementation="NotImplementedError"notinsource_codeself._debug_vis_handle=None# extend UI elements# we need to do this here after all the managers are initialized# this is because they dictate the sensors and commands right nowifself.sim.has_gui()andself.cfg.ui_window_class_typeisnotNone:self._window=self.cfg.ui_window_class_type(self,window_name="IsaacLab")else:# if no window, then we don't need to store the windowself._window=None# allocate dictionary to store metricsself.extras={}# initialize data and constants# -- counter for simulation stepsself._sim_step_counter=0# -- counter for curriculumself.common_step_counter=0# -- init buffersself.episode_length_buf=torch.zeros(self.num_envs,device=self.device,dtype=torch.long)self.reset_terminated=torch.zeros(self.num_envs,device=self.device,dtype=torch.bool)self.reset_time_outs=torch.zeros_like(self.reset_terminated)self.reset_buf=torch.zeros(self.num_envs,dtype=torch.bool,device=self.sim.device)# setup the action and observation spaces for Gymself._configure_gym_env_spaces()# setup noise cfg for adding action and observation noiseifself.cfg.action_noise_model:self._action_noise_model:NoiseModel=self.cfg.action_noise_model.class_type(self.cfg.action_noise_model,num_envs=self.num_envs,device=self.device)ifself.cfg.observation_noise_model:self._observation_noise_model:NoiseModel=self.cfg.observation_noise_model.class_type(self.cfg.observation_noise_model,num_envs=self.num_envs,device=self.device)# perform events at the start of the simulationifself.cfg.events:# we print it here to make the logging consistentprint("[INFO] Event Manager: ",self.event_manager)if"startup"inself.event_manager.available_modes:self.event_manager.apply(mode="startup")# set the framerate of the gym video recorder wrapper so that the playback speed of the produced# video matches the simulationself.metadata["render_fps"]=1/self.step_dt# print the environment informationprint("[INFO]: Completed setting up the environment...")
def__del__(self):"""Cleanup for the environment."""self.close()""" Properties. """@propertydefnum_envs(self)->int:"""The number of instances of the environment that are running."""returnself.scene.num_envs@propertydefphysics_dt(self)->float:"""The physics time-step (in s). This is the lowest time-decimation at which the simulation is happening. """returnself.cfg.sim.dt@propertydefstep_dt(self)->float:"""The environment stepping time-step (in s). This is the time-step at which the environment steps forward. """returnself.cfg.sim.dt*self.cfg.decimation@propertydefdevice(self):"""The device on which the environment is running."""returnself.sim.device@propertydefmax_episode_length_s(self)->float:"""Maximum episode length in seconds."""returnself.cfg.episode_length_s@propertydefmax_episode_length(self):"""The maximum episode length in steps adjusted from s."""returnmath.ceil(self.max_episode_length_s/(self.cfg.sim.dt*self.cfg.decimation))""" Operations. """
[docs]defreset(self,seed:int|None=None,options:dict[str,Any]|None=None)->tuple[VecEnvObs,dict]:"""Resets all the environments and returns observations. This function calls the :meth:`_reset_idx` function to reset all the environments. However, certain operations, such as procedural terrain generation, that happened during initialization are not repeated. Args: seed: The seed to use for randomization. Defaults to None, in which case the seed is not set. options: Additional information to specify how the environment is reset. Defaults to None. Note: This argument is used for compatibility with Gymnasium environment definition. Returns: A tuple containing the observations and extras. """# set the seedifseedisnotNone:self.seed(seed)# reset state of sceneindices=torch.arange(self.num_envs,dtype=torch.int64,device=self.device)self._reset_idx(indices)# update articulation kinematicsself.scene.write_data_to_sim()self.sim.forward()# if sensors are added to the scene, make sure we render to reflect changes in resetifself.sim.has_rtx_sensors()andself.cfg.rerender_on_reset:self.sim.render()ifself.cfg.wait_for_texturesandself.sim.has_rtx_sensors():whileSimulationManager.assets_loading():self.sim.render()# return observationsreturnself._get_observations(),self.extras
[docs]defstep(self,action:torch.Tensor)->VecEnvStepReturn:"""Execute one time-step of the environment's dynamics. The environment steps forward at a fixed time-step, while the physics simulation is decimated at a lower time-step. This is to ensure that the simulation is stable. These two time-steps can be configured independently using the :attr:`DirectRLEnvCfg.decimation` (number of simulation steps per environment step) and the :attr:`DirectRLEnvCfg.sim.physics_dt` (physics time-step). Based on these parameters, the environment time-step is computed as the product of the two. This function performs the following steps: 1. Pre-process the actions before stepping through the physics. 2. Apply the actions to the simulator and step through the physics in a decimated manner. 3. Compute the reward and done signals. 4. Reset environments that have terminated or reached the maximum episode length. 5. Apply interval events if they are enabled. 6. Compute observations. Args: action: The actions to apply on the environment. Shape is (num_envs, action_dim). Returns: A tuple containing the observations, rewards, resets (terminated and truncated) and extras. """action=action.to(self.device)# add action noiseifself.cfg.action_noise_model:action=self._action_noise_model.apply(action)# process actionsself._pre_physics_step(action)# check if we need to do rendering within the physics loop# note: checked here once to avoid multiple checks within the loopis_rendering=self.sim.has_gui()orself.sim.has_rtx_sensors()# perform physics steppingfor_inrange(self.cfg.decimation):self._sim_step_counter+=1# set actions into buffersself._apply_action()# set actions into simulatorself.scene.write_data_to_sim()# simulateself.sim.step(render=False)# render between steps only if the GUI or an RTX sensor needs it# note: we assume the render interval to be the shortest accepted rendering interval.# If a camera needs rendering at a faster frequency, this will lead to unexpected behavior.ifself._sim_step_counter%self.cfg.sim.render_interval==0andis_rendering:self.sim.render()# update buffers at sim dtself.scene.update(dt=self.physics_dt)# post-step:# -- update env counters (used for curriculum generation)self.episode_length_buf+=1# step in current episode (per env)self.common_step_counter+=1# total step (common for all envs)self.reset_terminated[:],self.reset_time_outs[:]=self._get_dones()self.reset_buf=self.reset_terminated|self.reset_time_outsself.reward_buf=self._get_rewards()# -- reset envs that terminated/timed-out and log the episode informationreset_env_ids=self.reset_buf.nonzero(as_tuple=False).squeeze(-1)iflen(reset_env_ids)>0:self._reset_idx(reset_env_ids)# update articulation kinematicsself.scene.write_data_to_sim()self.sim.forward()# if sensors are added to the scene, make sure we render to reflect changes in resetifself.sim.has_rtx_sensors()andself.cfg.rerender_on_reset:self.sim.render()# post-step: step interval eventifself.cfg.events:if"interval"inself.event_manager.available_modes:self.event_manager.apply(mode="interval",dt=self.step_dt)# update observationsself.obs_buf=self._get_observations()# add observation noise# note: we apply no noise to the state space (since it is used for critic networks)ifself.cfg.observation_noise_model:self.obs_buf["policy"]=self._observation_noise_model.apply(self.obs_buf["policy"])# return observations, rewards, resets and extrasreturnself.obs_buf,self.reward_buf,self.reset_terminated,self.reset_time_outs,self.extras
[docs]@staticmethoddefseed(seed:int=-1)->int:"""Set the seed for the environment. Args: seed: The seed for random generator. Defaults to -1. Returns: The seed used for random generator. """# set seed for replicatortry:importomni.replicator.coreasreprep.set_global_seed(seed)exceptModuleNotFoundError:pass# set seed for torch and other librariesreturntorch_utils.set_seed(seed)
[docs]defrender(self,recompute:bool=False)->np.ndarray|None:"""Run rendering without stepping through the physics. By convention, if mode is: - **human**: Render to the current display and return nothing. Usually for human consumption. - **rgb_array**: Return an numpy.ndarray with shape (x, y, 3), representing RGB values for an x-by-y pixel image, suitable for turning into a video. Args: recompute: Whether to force a render even if the simulator has already rendered the scene. Defaults to False. Returns: The rendered image as a numpy array if mode is "rgb_array". Otherwise, returns None. Raises: RuntimeError: If mode is set to "rgb_data" and simulation render mode does not support it. In this case, the simulation render mode must be set to ``RenderMode.PARTIAL_RENDERING`` or ``RenderMode.FULL_RENDERING``. NotImplementedError: If an unsupported rendering mode is specified. """# run a rendering step of the simulator# if we have rtx sensors, we do not need to render again sinifnotself.sim.has_rtx_sensors()andnotrecompute:self.sim.render()# decide the rendering modeifself.render_mode=="human"orself.render_modeisNone:returnNoneelifself.render_mode=="rgb_array":# check that if any render could have happenedifself.sim.render_mode.value<self.sim.RenderMode.PARTIAL_RENDERING.value:raiseRuntimeError(f"Cannot render '{self.render_mode}' when the simulation render mode is"f" '{self.sim.render_mode.name}'. Please set the simulation render mode to:"f"'{self.sim.RenderMode.PARTIAL_RENDERING.name}' or '{self.sim.RenderMode.FULL_RENDERING.name}'."" If running headless, make sure --enable_cameras is set.")# create the annotator if it does not existifnothasattr(self,"_rgb_annotator"):importomni.replicator.coreasrep# create render productself._render_product=rep.create.render_product(self.cfg.viewer.cam_prim_path,self.cfg.viewer.resolution)# create rgb annotator -- used to read data from the render productself._rgb_annotator=rep.AnnotatorRegistry.get_annotator("rgb",device="cpu")self._rgb_annotator.attach([self._render_product])# obtain the rgb datargb_data=self._rgb_annotator.get_data()# convert to numpy arrayrgb_data=np.frombuffer(rgb_data,dtype=np.uint8).reshape(*rgb_data.shape)# return the rgb data# note: initially the renerer is warming up and returns empty dataifrgb_data.size==0:returnnp.zeros((self.cfg.viewer.resolution[1],self.cfg.viewer.resolution[0],3),dtype=np.uint8)else:returnrgb_data[:,:,:3]else:raiseNotImplementedError(f"Render mode '{self.render_mode}' is not supported. Please use: {self.metadata['render_modes']}.")
[docs]defclose(self):"""Cleanup for the environment."""ifnotself._is_closed:# close entities related to the environment# note: this is order-sensitive to avoid any dangling referencesifself.cfg.events:delself.event_managerdelself.sceneifself.viewport_camera_controllerisnotNone:delself.viewport_camera_controller# clear callbacks and instanceself.sim.clear_all_callbacks()self.sim.clear_instance()# destroy the windowifself._windowisnotNone:self._window=None# update closing statusself._is_closed=True
""" Operations - Debug Visualization. """
[docs]defset_debug_vis(self,debug_vis:bool)->bool:"""Toggles the environment debug visualization. Args: debug_vis: Whether to visualize the environment debug visualization. Returns: Whether the debug visualization was successfully set. False if the environment does not support debug visualization. """# check if debug visualization is supportedifnotself.has_debug_vis_implementation:returnFalse# toggle debug visualization objectsself._set_debug_vis_impl(debug_vis)# toggle debug visualization handlesifdebug_vis:# create a subscriber for the post update event if it doesn't existifself._debug_vis_handleisNone:app_interface=omni.kit.app.get_app_interface()self._debug_vis_handle=app_interface.get_post_update_event_stream().create_subscription_to_pop(lambdaevent,obj=weakref.proxy(self):obj._debug_vis_callback(event))else:# remove the subscriber if it existsifself._debug_vis_handleisnotNone:self._debug_vis_handle.unsubscribe()self._debug_vis_handle=None# return successreturnTrue
""" Helper functions. """def_configure_gym_env_spaces(self):"""Configure the action and observation spaces for the Gym environment."""# show deprecation message and overwrite configurationifself.cfg.num_actionsisnotNone:omni.log.warn("DirectRLEnvCfg.num_actions is deprecated. Use DirectRLEnvCfg.action_space instead.")ifisinstance(self.cfg.action_space,type(MISSING)):self.cfg.action_space=self.cfg.num_actionsifself.cfg.num_observationsisnotNone:omni.log.warn("DirectRLEnvCfg.num_observations is deprecated. Use DirectRLEnvCfg.observation_space instead.")ifisinstance(self.cfg.observation_space,type(MISSING)):self.cfg.observation_space=self.cfg.num_observationsifself.cfg.num_statesisnotNone:omni.log.warn("DirectRLEnvCfg.num_states is deprecated. Use DirectRLEnvCfg.state_space instead.")ifisinstance(self.cfg.state_space,type(MISSING)):self.cfg.state_space=self.cfg.num_states# set up spacesself.single_observation_space=gym.spaces.Dict()self.single_observation_space["policy"]=spec_to_gym_space(self.cfg.observation_space)self.single_action_space=spec_to_gym_space(self.cfg.action_space)# batch the spaces for vectorized environmentsself.observation_space=gym.vector.utils.batch_space(self.single_observation_space["policy"],self.num_envs)self.action_space=gym.vector.utils.batch_space(self.single_action_space,self.num_envs)# optional state space for asymmetric actor-critic architecturesself.state_space=Noneifself.cfg.state_space:self.single_observation_space["critic"]=spec_to_gym_space(self.cfg.state_space)self.state_space=gym.vector.utils.batch_space(self.single_observation_space["critic"],self.num_envs)# instantiate actions (needed for tasks for which the observations computation is dependent on the actions)self.actions=sample_space(self.single_action_space,self.sim.device,batch_size=self.num_envs,fill_value=0)def_reset_idx(self,env_ids:Sequence[int]):"""Reset environments based on specified indices. Args: env_ids: List of environment ids which must be reset """self.scene.reset(env_ids)# apply events such as randomization for environments that need a resetifself.cfg.events:if"reset"inself.event_manager.available_modes:env_step_count=self._sim_step_counter//self.cfg.decimationself.event_manager.apply(mode="reset",env_ids=env_ids,global_env_step_count=env_step_count)# reset noise modelsifself.cfg.action_noise_model:self._action_noise_model.reset(env_ids)ifself.cfg.observation_noise_model:self._observation_noise_model.reset(env_ids)# reset the episode length bufferself.episode_length_buf[env_ids]=0""" Implementation-specific functions. """def_setup_scene(self):"""Setup the scene for the environment. This function is responsible for creating the scene objects and setting up the scene for the environment. The scene creation can happen through :class:`isaaclab.scene.InteractiveSceneCfg` or through directly creating the scene objects and registering them with the scene manager. We leave the implementation of this function to the derived classes. If the environment does not require any explicit scene setup, the function can be left empty. """pass@abstractmethoddef_pre_physics_step(self,actions:torch.Tensor):"""Pre-process actions before stepping through the physics. This function is responsible for pre-processing the actions before stepping through the physics. It is called before the physics stepping (which is decimated). Args: actions: The actions to apply on the environment. Shape is (num_envs, action_dim). """raiseNotImplementedError(f"Please implement the '_pre_physics_step' method for {self.__class__.__name__}.")@abstractmethoddef_apply_action(self):"""Apply actions to the simulator. This function is responsible for applying the actions to the simulator. It is called at each physics time-step. """raiseNotImplementedError(f"Please implement the '_apply_action' method for {self.__class__.__name__}.")@abstractmethoddef_get_observations(self)->VecEnvObs:"""Compute and return the observations for the environment. Returns: The observations for the environment. """raiseNotImplementedError(f"Please implement the '_get_observations' method for {self.__class__.__name__}.")def_get_states(self)->VecEnvObs|None:"""Compute and return the states for the environment. The state-space is used for asymmetric actor-critic architectures. It is configured using the :attr:`DirectRLEnvCfg.state_space` parameter. Returns: The states for the environment. If the environment does not have a state-space, the function returns a None. """returnNone# noqa: R501@abstractmethoddef_get_rewards(self)->torch.Tensor:"""Compute and return the rewards for the environment. Returns: The rewards for the environment. Shape is (num_envs,). """raiseNotImplementedError(f"Please implement the '_get_rewards' method for {self.__class__.__name__}.")@abstractmethoddef_get_dones(self)->tuple[torch.Tensor,torch.Tensor]:"""Compute and return the done flags for the environment. Returns: A tuple containing the done flags for termination and time-out. Shape of individual tensors is (num_envs,). """raiseNotImplementedError(f"Please implement the '_get_dones' method for {self.__class__.__name__}.")def_set_debug_vis_impl(self,debug_vis:bool):"""Set debug visualization into visualization objects. This function is responsible for creating the visualization objects if they don't exist and input ``debug_vis`` is True. If the visualization objects exist, the function should set their visibility into the stage. """raiseNotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.")