# Copyright (c) 2022-2025, The Isaac Lab Project Developers.# All rights reserved.## SPDX-License-Identifier: BSD-3-Clauseimportbuiltinsimporttorchfromcollections.abcimportSequencefromtypingimportAnyimportisaacsim.core.utils.torchastorch_utilsimportomni.logfromisaacsim.core.simulation_managerimportSimulationManagerfromisaaclab.managersimportActionManager,EventManager,ObservationManager,RecorderManagerfromisaaclab.sceneimportInteractiveScenefromisaaclab.simimportSimulationContextfromisaaclab.ui.widgetsimportManagerLiveVisualizerfromisaaclab.utils.timerimportTimerfrom.commonimportVecEnvObsfrom.manager_based_env_cfgimportManagerBasedEnvCfgfrom.uiimportViewportCameraController
[docs]classManagerBasedEnv:"""The base environment encapsulates the simulation scene and the environment managers for the manager-based workflow. While a simulation scene or world comprises of different components such as the robots, objects, and sensors (cameras, lidars, etc.), the environment is a higher level abstraction that provides an interface for interacting with the simulation. The environment is comprised of the following components: * **Scene**: The scene manager that creates and manages the virtual world in which the robot operates. This includes defining the robot, static and dynamic objects, sensors, etc. * **Observation Manager**: The observation manager that generates observations from the current simulation state and the data gathered from the sensors. These observations may include privileged information that is not available to the robot in the real world. Additionally, user-defined terms can be added to process the observations and generate custom observations. For example, using a network to embed high-dimensional observations into a lower-dimensional space. * **Action Manager**: The action manager that processes the raw actions sent to the environment and converts them to low-level commands that are sent to the simulation. It can be configured to accept raw actions at different levels of abstraction. For example, in case of a robotic arm, the raw actions can be joint torques, joint positions, or end-effector poses. Similarly for a mobile base, it can be the joint torques, or the desired velocity of the floating base. * **Event Manager**: The event manager orchestrates operations triggered based on simulation events. This includes resetting the scene to a default state, applying random pushes to the robot at different intervals of time, or randomizing properties such as mass and friction coefficients. This is useful for training and evaluating the robot in a variety of scenarios. * **Recorder Manager**: The recorder manager that handles recording data produced during different steps in the simulation. This includes recording in the beginning and end of a reset and a step. The recorded data is distinguished per episode, per environment and can be exported through a dataset file handler to a file. The environment provides a unified interface for interacting with the simulation. However, it does not include task-specific quantities such as the reward function, or the termination conditions. These quantities are often specific to defining Markov Decision Processes (MDPs) while the base environment is agnostic to the MDP definition. The environment steps forward in time at a fixed time-step. The physics simulation is decimated at a lower time-step. This is to ensure that the simulation is stable. These two time-steps can be configured independently using the :attr:`ManagerBasedEnvCfg.decimation` (number of simulation steps per environment step) and the :attr:`ManagerBasedEnvCfg.sim.dt` (physics time-step) parameters. Based on these parameters, the environment time-step is computed as the product of the two. The two time-steps can be obtained by querying the :attr:`physics_dt` and the :attr:`step_dt` properties respectively. """
[docs]def__init__(self,cfg:ManagerBasedEnvCfg):"""Initialize the environment. Args: cfg: The configuration object for the environment. Raises: RuntimeError: If a simulation context already exists. The environment must always create one since it configures the simulation context and controls the simulation. """# check that the config is validcfg.validate()# store inputs to classself.cfg=cfg# initialize internal variablesself._is_closed=False# set the seed for the environmentifself.cfg.seedisnotNone:self.cfg.seed=self.seed(self.cfg.seed)else:omni.log.warn("Seed not set for the environment. The environment creation may not be deterministic.")# create a simulation context to control the simulatorifSimulationContext.instance()isNone:# the type-annotation is required to avoid a type-checking error# since it gets confused with Isaac Sim's SimulationContext classself.sim:SimulationContext=SimulationContext(self.cfg.sim)else:# simulation context should only be created before the environment# when in extension modeifnotbuiltins.ISAAC_LAUNCHED_FROM_TERMINAL:raiseRuntimeError("Simulation context already exists. Cannot create a new one.")self.sim:SimulationContext=SimulationContext.instance()# print useful informationprint("[INFO]: Base environment:")print(f"\tEnvironment device : {self.device}")print(f"\tEnvironment seed : {self.cfg.seed}")print(f"\tPhysics step-size : {self.physics_dt}")print(f"\tRendering step-size : {self.physics_dt*self.cfg.sim.render_interval}")print(f"\tEnvironment step-size : {self.step_dt}")ifself.cfg.sim.render_interval<self.cfg.decimation:msg=(f"The render interval ({self.cfg.sim.render_interval}) is smaller than the decimation "f"({self.cfg.decimation}). Multiple render calls will happen for each environment step. ""If this is not intended, set the render interval to be equal to the decimation.")omni.log.warn(msg)# counter for simulation stepsself._sim_step_counter=0# generate scenewithTimer("[INFO]: Time taken for scene creation","scene_creation"):self.scene=InteractiveScene(self.cfg.scene)print("[INFO]: Scene manager: ",self.scene)# set up camera viewport controller# viewport is not available in other rendering modes so the function will throw a warning# FIXME: This needs to be fixed in the future when we unify the UI functionalities even for# non-rendering modes.ifself.sim.render_mode>=self.sim.RenderMode.PARTIAL_RENDERING:self.viewport_camera_controller=ViewportCameraController(self,self.cfg.viewer)else:self.viewport_camera_controller=None# play the simulator to activate physics handles# note: this activates the physics simulation view that exposes TensorAPIs# note: when started in extension mode, first call sim.reset_async() and then initialize the managersifbuiltins.ISAAC_LAUNCHED_FROM_TERMINALisFalse:print("[INFO]: Starting the simulation. This may take a few seconds. Please wait...")withTimer("[INFO]: Time taken for simulation start","simulation_start"):self.sim.reset()# update scene to pre populate data buffers for assets and sensors.# this is needed for the observation manager to get valid tensors for initialization.# this shouldn't cause an issue since later on, users do a reset over all the environments so the lazy buffers would be reset.self.scene.update(dt=self.physics_dt)# add timeline event to load managersself.load_managers()# make sure torch is running on the correct deviceif"cuda"inself.device:torch.cuda.set_device(self.device)# extend UI elements# we need to do this here after all the managers are initialized# this is because they dictate the sensors and commands right nowifself.sim.has_gui()andself.cfg.ui_window_class_typeisnotNone:# setup live visualizersself.setup_manager_visualizers()self._window=self.cfg.ui_window_class_type(self,window_name="IsaacLab")else:# if no window, then we don't need to store the windowself._window=None# allocate dictionary to store metricsself.extras={}# initialize observation buffersself.obs_buf={}
def__del__(self):"""Cleanup for the environment."""self.close()""" Properties. """@propertydefnum_envs(self)->int:"""The number of instances of the environment that are running."""returnself.scene.num_envs@propertydefphysics_dt(self)->float:"""The physics time-step (in s). This is the lowest time-decimation at which the simulation is happening. """returnself.cfg.sim.dt@propertydefstep_dt(self)->float:"""The environment stepping time-step (in s). This is the time-step at which the environment steps forward. """returnself.cfg.sim.dt*self.cfg.decimation@propertydefdevice(self):"""The device on which the environment is running."""returnself.sim.device""" Operations - Setup. """
[docs]defload_managers(self):"""Load the managers for the environment. This function is responsible for creating the various managers (action, observation, events, etc.) for the environment. Since the managers require access to physics handles, they can only be created after the simulator is reset (i.e. played for the first time). .. note:: In case of standalone application (when running simulator from Python), the function is called automatically when the class is initialized. However, in case of extension mode, the user must call this function manually after the simulator is reset. This is because the simulator is only reset when the user calls :meth:`SimulationContext.reset_async` and it isn't possible to call async functions in the constructor. """# prepare the managers# -- recorder managerself.recorder_manager=RecorderManager(self.cfg.recorders,self)print("[INFO] Recorder Manager: ",self.recorder_manager)# -- action managerself.action_manager=ActionManager(self.cfg.actions,self)print("[INFO] Action Manager: ",self.action_manager)# -- observation managerself.observation_manager=ObservationManager(self.cfg.observations,self)print("[INFO] Observation Manager:",self.observation_manager)# -- event managerself.event_manager=EventManager(self.cfg.events,self)print("[INFO] Event Manager: ",self.event_manager)# perform events at the start of the simulation# in-case a child implementation creates other managers, the randomization should happen# when all the other managers are createdifself.__class__==ManagerBasedEnvand"startup"inself.event_manager.available_modes:self.event_manager.apply(mode="startup")
[docs]defsetup_manager_visualizers(self):"""Creates live visualizers for manager terms."""self.manager_visualizers={"action_manager":ManagerLiveVisualizer(manager=self.action_manager),"observation_manager":ManagerLiveVisualizer(manager=self.observation_manager),}
""" Operations - MDP. """
[docs]defreset(self,seed:int|None=None,env_ids:Sequence[int]|None=None,options:dict[str,Any]|None=None)->tuple[VecEnvObs,dict]:"""Resets the specified environments and returns observations. This function calls the :meth:`_reset_idx` function to reset the specified environments. However, certain operations, such as procedural terrain generation, that happened during initialization are not repeated. Args: seed: The seed to use for randomization. Defaults to None, in which case the seed is not set. env_ids: The environment ids to reset. Defaults to None, in which case all environments are reset. options: Additional information to specify how the environment is reset. Defaults to None. Note: This argument is used for compatibility with Gymnasium environment definition. Returns: A tuple containing the observations and extras. """ifenv_idsisNone:env_ids=torch.arange(self.num_envs,dtype=torch.int64,device=self.device)# trigger recorder terms for pre-reset callsself.recorder_manager.record_pre_reset(env_ids)# set the seedifseedisnotNone:self.seed(seed)# reset state of sceneself._reset_idx(env_ids)# update articulation kinematicsself.scene.write_data_to_sim()self.sim.forward()# if sensors are added to the scene, make sure we render to reflect changes in resetifself.sim.has_rtx_sensors()andself.cfg.rerender_on_reset:self.sim.render()# trigger recorder terms for post-reset callsself.recorder_manager.record_post_reset(env_ids)# compute observationsself.obs_buf=self.observation_manager.compute()ifself.cfg.wait_for_texturesandself.sim.has_rtx_sensors():whileSimulationManager.assets_loading():self.sim.render()# return observationsreturnself.obs_buf,self.extras
[docs]defreset_to(self,state:dict[str,dict[str,dict[str,torch.Tensor]]],env_ids:Sequence[int]|None,seed:int|None=None,is_relative:bool=False,)->None:"""Resets specified environments to known states. Note that this is different from reset() function as it resets the environments to specific states Args: state: The state to reset the specified environments to. env_ids: The environment ids to reset. Defaults to None, in which case all environments are reset. seed: The seed to use for randomization. Defaults to None, in which case the seed is not set. is_relative: If set to True, the state is considered relative to the environment origins. Defaults to False. """# reset all envs in the scene if env_ids is Noneifenv_idsisNone:env_ids=torch.arange(self.num_envs,dtype=torch.int64,device=self.device)# trigger recorder terms for pre-reset callsself.recorder_manager.record_pre_reset(env_ids)# set the seedifseedisnotNone:self.seed(seed)self._reset_idx(env_ids)# set the stateself.scene.reset_to(state,env_ids,is_relative=is_relative)# update articulation kinematicsself.sim.forward()# if sensors are added to the scene, make sure we render to reflect changes in resetifself.sim.has_rtx_sensors()andself.cfg.rerender_on_reset:self.sim.render()# trigger recorder terms for post-reset callsself.recorder_manager.record_post_reset(env_ids)# compute observationsself.obs_buf=self.observation_manager.compute()# return observationsreturnself.obs_buf,self.extras
[docs]defstep(self,action:torch.Tensor)->tuple[VecEnvObs,dict]:"""Execute one time-step of the environment's dynamics. The environment steps forward at a fixed time-step, while the physics simulation is decimated at a lower time-step. This is to ensure that the simulation is stable. These two time-steps can be configured independently using the :attr:`ManagerBasedEnvCfg.decimation` (number of simulation steps per environment step) and the :attr:`ManagerBasedEnvCfg.sim.dt` (physics time-step). Based on these parameters, the environment time-step is computed as the product of the two. Args: action: The actions to apply on the environment. Shape is (num_envs, action_dim). Returns: A tuple containing the observations and extras. """# process actionsself.action_manager.process_action(action.to(self.device))self.recorder_manager.record_pre_step()# check if we need to do rendering within the physics loop# note: checked here once to avoid multiple checks within the loopis_rendering=self.sim.has_gui()orself.sim.has_rtx_sensors()# perform physics steppingfor_inrange(self.cfg.decimation):self._sim_step_counter+=1# set actions into buffersself.action_manager.apply_action()# set actions into simulatorself.scene.write_data_to_sim()# simulateself.sim.step(render=False)# render between steps only if the GUI or an RTX sensor needs it# note: we assume the render interval to be the shortest accepted rendering interval.# If a camera needs rendering at a faster frequency, this will lead to unexpected behavior.ifself._sim_step_counter%self.cfg.sim.render_interval==0andis_rendering:self.sim.render()# update buffers at sim dtself.scene.update(dt=self.physics_dt)# post-step: step interval eventif"interval"inself.event_manager.available_modes:self.event_manager.apply(mode="interval",dt=self.step_dt)# -- compute observationsself.obs_buf=self.observation_manager.compute()self.recorder_manager.record_post_step()# return observations and extrasreturnself.obs_buf,self.extras
[docs]@staticmethoddefseed(seed:int=-1)->int:"""Set the seed for the environment. Args: seed: The seed for random generator. Defaults to -1. Returns: The seed used for random generator. """# set seed for replicatortry:importomni.replicator.coreasreprep.set_global_seed(seed)exceptModuleNotFoundError:pass# set seed for torch and other librariesreturntorch_utils.set_seed(seed)
[docs]defclose(self):"""Cleanup for the environment."""ifnotself._is_closed:# destructor is order-sensitivedelself.viewport_camera_controllerdelself.action_managerdelself.observation_managerdelself.event_managerdelself.recorder_managerdelself.scene# clear callbacks and instanceself.sim.clear_all_callbacks()self.sim.clear_instance()# destroy the windowifself._windowisnotNone:self._window=None# update closing statusself._is_closed=True
""" Helper functions. """def_reset_idx(self,env_ids:Sequence[int]):"""Reset environments based on specified indices. Args: env_ids: List of environment ids which must be reset """# reset the internal buffers of the scene elementsself.scene.reset(env_ids)# apply events such as randomization for environments that need a resetif"reset"inself.event_manager.available_modes:env_step_count=self._sim_step_counter//self.cfg.decimationself.event_manager.apply(mode="reset",env_ids=env_ids,global_env_step_count=env_step_count)# iterate over all managers and reset them# this returns a dictionary of information which is stored in the extras# note: This is order-sensitive! Certain things need be reset before others.self.extras["log"]=dict()# -- observation managerinfo=self.observation_manager.reset(env_ids)self.extras["log"].update(info)# -- action managerinfo=self.action_manager.reset(env_ids)self.extras["log"].update(info)# -- event managerinfo=self.event_manager.reset(env_ids)self.extras["log"].update(info)# -- recorder managerinfo=self.recorder_manager.reset(env_ids)self.extras["log"].update(info)