Creating a Direct Workflow RL Environment#

In addition to the envs.ManagerBasedRLEnv class, which encourages the use of configuration classes for more modular environments, the DirectRLEnv class allows for more direct control in the scripting of environment.

Instead of using Manager classes for defining rewards and observations, the direct workflow tasks implement the full reward and observation functions directly in the task script. This allows for more control in the implementation of the methods, such as using pytorch jit features, and provides a less abstracted framework that makes it easier to find the various pieces of code.

In this tutorial, we will configure the cartpole environment using the direct workflow implementation to create a task for balancing the pole upright. We will learn how to specify the task using by implementing functions for scene creation, actions, resets, rewards and observations.

The Code#

For this tutorial, we use the cartpole environment defined in omni.isaac.lab_tasks.direct.cartpole module.

Code for cartpole_env.py
  1# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
  2# All rights reserved.
  3#
  4# SPDX-License-Identifier: BSD-3-Clause
  5
  6from __future__ import annotations
  7
  8import math
  9import torch
 10from collections.abc import Sequence
 11
 12from omni.isaac.lab_assets.cartpole import CARTPOLE_CFG
 13
 14import omni.isaac.lab.sim as sim_utils
 15from omni.isaac.lab.assets import Articulation, ArticulationCfg
 16from omni.isaac.lab.envs import DirectRLEnv, DirectRLEnvCfg
 17from omni.isaac.lab.scene import InteractiveSceneCfg
 18from omni.isaac.lab.sim import SimulationCfg
 19from omni.isaac.lab.sim.spawners.from_files import GroundPlaneCfg, spawn_ground_plane
 20from omni.isaac.lab.utils import configclass
 21from omni.isaac.lab.utils.math import sample_uniform
 22
 23
 24@configclass
 25class CartpoleEnvCfg(DirectRLEnvCfg):
 26    # env
 27    decimation = 2
 28    episode_length_s = 5.0
 29    action_scale = 100.0  # [N]
 30    action_space = 1
 31    observation_space = 4
 32    state_space = 0
 33
 34    # simulation
 35    sim: SimulationCfg = SimulationCfg(dt=1 / 120, render_interval=decimation)
 36
 37    # robot
 38    robot_cfg: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="/World/envs/env_.*/Robot")
 39    cart_dof_name = "slider_to_cart"
 40    pole_dof_name = "cart_to_pole"
 41
 42    # scene
 43    scene: InteractiveSceneCfg = InteractiveSceneCfg(num_envs=4096, env_spacing=4.0, replicate_physics=True)
 44
 45    # reset
 46    max_cart_pos = 3.0  # the cart is reset if it exceeds that position [m]
 47    initial_pole_angle_range = [-0.25, 0.25]  # the range in which the pole angle is sampled from on reset [rad]
 48
 49    # reward scales
 50    rew_scale_alive = 1.0
 51    rew_scale_terminated = -2.0
 52    rew_scale_pole_pos = -1.0
 53    rew_scale_cart_vel = -0.01
 54    rew_scale_pole_vel = -0.005
 55
 56
 57class CartpoleEnv(DirectRLEnv):
 58    cfg: CartpoleEnvCfg
 59
 60    def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
 61        super().__init__(cfg, render_mode, **kwargs)
 62
 63        self._cart_dof_idx, _ = self.cartpole.find_joints(self.cfg.cart_dof_name)
 64        self._pole_dof_idx, _ = self.cartpole.find_joints(self.cfg.pole_dof_name)
 65        self.action_scale = self.cfg.action_scale
 66
 67        self.joint_pos = self.cartpole.data.joint_pos
 68        self.joint_vel = self.cartpole.data.joint_vel
 69
 70    def _setup_scene(self):
 71        self.cartpole = Articulation(self.cfg.robot_cfg)
 72        # add ground plane
 73        spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
 74        # clone, filter, and replicate
 75        self.scene.clone_environments(copy_from_source=False)
 76        self.scene.filter_collisions(global_prim_paths=[])
 77        # add articulation to scene
 78        self.scene.articulations["cartpole"] = self.cartpole
 79        # add lights
 80        light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
 81        light_cfg.func("/World/Light", light_cfg)
 82
 83    def _pre_physics_step(self, actions: torch.Tensor) -> None:
 84        self.actions = self.action_scale * actions.clone()
 85
 86    def _apply_action(self) -> None:
 87        self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)
 88
 89    def _get_observations(self) -> dict:
 90        obs = torch.cat(
 91            (
 92                self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
 93                self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
 94                self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
 95                self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
 96            ),
 97            dim=-1,
 98        )
 99        observations = {"policy": obs}
100        return observations
101
102    def _get_rewards(self) -> torch.Tensor:
103        total_reward = compute_rewards(
104            self.cfg.rew_scale_alive,
105            self.cfg.rew_scale_terminated,
106            self.cfg.rew_scale_pole_pos,
107            self.cfg.rew_scale_cart_vel,
108            self.cfg.rew_scale_pole_vel,
109            self.joint_pos[:, self._pole_dof_idx[0]],
110            self.joint_vel[:, self._pole_dof_idx[0]],
111            self.joint_pos[:, self._cart_dof_idx[0]],
112            self.joint_vel[:, self._cart_dof_idx[0]],
113            self.reset_terminated,
114        )
115        return total_reward
116
117    def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
118        self.joint_pos = self.cartpole.data.joint_pos
119        self.joint_vel = self.cartpole.data.joint_vel
120
121        time_out = self.episode_length_buf >= self.max_episode_length - 1
122        out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
123        out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
124        return out_of_bounds, time_out
125
126    def _reset_idx(self, env_ids: Sequence[int] | None):
127        if env_ids is None:
128            env_ids = self.cartpole._ALL_INDICES
129        super()._reset_idx(env_ids)
130
131        joint_pos = self.cartpole.data.default_joint_pos[env_ids]
132        joint_pos[:, self._pole_dof_idx] += sample_uniform(
133            self.cfg.initial_pole_angle_range[0] * math.pi,
134            self.cfg.initial_pole_angle_range[1] * math.pi,
135            joint_pos[:, self._pole_dof_idx].shape,
136            joint_pos.device,
137        )
138        joint_vel = self.cartpole.data.default_joint_vel[env_ids]
139
140        default_root_state = self.cartpole.data.default_root_state[env_ids]
141        default_root_state[:, :3] += self.scene.env_origins[env_ids]
142
143        self.joint_pos[env_ids] = joint_pos
144        self.joint_vel[env_ids] = joint_vel
145
146        self.cartpole.write_root_link_pose_to_sim(default_root_state[:, :7], env_ids)
147        self.cartpole.write_root_com_velocity_to_sim(default_root_state[:, 7:], env_ids)
148        self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
149
150
151@torch.jit.script
152def compute_rewards(
153    rew_scale_alive: float,
154    rew_scale_terminated: float,
155    rew_scale_pole_pos: float,
156    rew_scale_cart_vel: float,
157    rew_scale_pole_vel: float,
158    pole_pos: torch.Tensor,
159    pole_vel: torch.Tensor,
160    cart_pos: torch.Tensor,
161    cart_vel: torch.Tensor,
162    reset_terminated: torch.Tensor,
163):
164    rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
165    rew_termination = rew_scale_terminated * reset_terminated.float()
166    rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos).unsqueeze(dim=1), dim=-1)
167    rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel).unsqueeze(dim=1), dim=-1)
168    rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel).unsqueeze(dim=1), dim=-1)
169    total_reward = rew_alive + rew_termination + rew_pole_pos + rew_cart_vel + rew_pole_vel
170    return total_reward

The Code Explained#

Similar to the manager-based environments, a configuration class is defined for the task to hold settings for the simulation parameters, the scene, the actors, and the task. With the direct workflow implementation, the envs.DirectRLEnvCfg class is used as the base class for configurations. Since the direct workflow implementation does not use Action and Observation managers, the task config should define the number of actions and observations for the environment.

@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
   ...
   action_space = 1
   observation_space = 4
   state_space = 0

The config class can also be used to define task-specific attributes, such as scaling for reward terms and thresholds for reset conditions.

@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
   ...
   # reset
   max_cart_pos = 3.0
   initial_pole_angle_range = [-0.25, 0.25]

   # reward scales
   rew_scale_alive = 1.0
   rew_scale_terminated = -2.0
   rew_scale_pole_pos = -1.0
   rew_scale_cart_vel = -0.01
   rew_scale_pole_vel = -0.005

When creating a new environment, the code should define a new class that inherits from DirectRLEnv.

class CartpoleEnv(DirectRLEnv):
   cfg: CartpoleEnvCfg

   def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
     super().__init__(cfg, render_mode, **kwargs)

The class can also hold class variables that are accessible by all functions in the class, including functions for applying actions, computing resets, rewards, and observations.

Scene Creation#

In contrast to manager-based environments where the scene creation is taken care of by the framework, the direct workflow implementation provides flexibility for users to implement their own scene creation function. This includes adding actors into the stage, cloning the environments, filtering collisions between the environments, adding the actors into the scene, and adding any additional props to the scene, such as ground plane and lights. These operations should be implemented in the _setup_scene(self) method.

    def _setup_scene(self):
        self.cartpole = Articulation(self.cfg.robot_cfg)
        # add ground plane
        spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
        # clone, filter, and replicate
        self.scene.clone_environments(copy_from_source=False)
        self.scene.filter_collisions(global_prim_paths=[])
        # add articulation to scene
        self.scene.articulations["cartpole"] = self.cartpole
        # add lights
        light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
        light_cfg.func("/World/Light", light_cfg)

Defining Rewards#

Reward function should be defined in the _get_rewards(self) API, which returns the reward buffer as a return value. Within this function, the task is free to implement the logic of the reward function. In this example, we implement a Pytorch JIT function that computes the various components of the reward function.

def _get_rewards(self) -> torch.Tensor:
     total_reward = compute_rewards(
         self.cfg.rew_scale_alive,
         self.cfg.rew_scale_terminated,
         self.cfg.rew_scale_pole_pos,
         self.cfg.rew_scale_cart_vel,
         self.cfg.rew_scale_pole_vel,
         self.joint_pos[:, self._pole_dof_idx[0]],
         self.joint_vel[:, self._pole_dof_idx[0]],
         self.joint_pos[:, self._cart_dof_idx[0]],
         self.joint_vel[:, self._cart_dof_idx[0]],
         self.reset_terminated,
     )
     return total_reward

@torch.jit.script
def compute_rewards(
    rew_scale_alive: float,
    rew_scale_terminated: float,
    rew_scale_pole_pos: float,
    rew_scale_cart_vel: float,
    rew_scale_pole_vel: float,
    pole_pos: torch.Tensor,
    pole_vel: torch.Tensor,
    cart_pos: torch.Tensor,
    cart_vel: torch.Tensor,
    reset_terminated: torch.Tensor,
):
    rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
    rew_termination = rew_scale_terminated * reset_terminated.float()
    rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos), dim=-1)
    rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel), dim=-1)
    rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel), dim=-1)
    total_reward = rew_alive + rew_termination + rew_pole_pos + rew_cart_vel + rew_pole_vel
    return total_reward

Defining Observations#

The observation buffer should be computed in the _get_observations(self) function, which constructs the observation buffer for the environment. At the end of this API, a dictionary should be returned that contains policy as the key, and the full observation buffer as the value. For asymmetric policies, the dictionary should also include the key critic and the states buffer as the value.

    def _get_observations(self) -> dict:
        obs = torch.cat(
            (
                self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
                self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
                self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
                self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
            ),
            dim=-1,
        )
        observations = {"policy": obs}
        return observations

Computing Dones and Performing Resets#

Populating the dones buffer should be done in the _get_dones(self) method. This method is free to implement logic that computes which environments would need to be reset and which environments have reached the episode length limit. Both results should be returned by the _get_dones(self) function, in the form of a tuple of boolean tensors.

    def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
        self.joint_pos = self.cartpole.data.joint_pos
        self.joint_vel = self.cartpole.data.joint_vel

        time_out = self.episode_length_buf >= self.max_episode_length - 1
        out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
        out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
        return out_of_bounds, time_out

Once the indices for environments requiring reset have been computed, the _reset_idx(self, env_ids) function performs the reset operations on those environments. Within this function, new states for the environments requiring reset should be set directly into simulation.

    def _reset_idx(self, env_ids: Sequence[int] | None):
        if env_ids is None:
            env_ids = self.cartpole._ALL_INDICES
        super()._reset_idx(env_ids)

        joint_pos = self.cartpole.data.default_joint_pos[env_ids]
        joint_pos[:, self._pole_dof_idx] += sample_uniform(
            self.cfg.initial_pole_angle_range[0] * math.pi,
            self.cfg.initial_pole_angle_range[1] * math.pi,
            joint_pos[:, self._pole_dof_idx].shape,
            joint_pos.device,
        )
        joint_vel = self.cartpole.data.default_joint_vel[env_ids]

        default_root_state = self.cartpole.data.default_root_state[env_ids]
        default_root_state[:, :3] += self.scene.env_origins[env_ids]

        self.joint_pos[env_ids] = joint_pos
        self.joint_vel[env_ids] = joint_vel

        self.cartpole.write_root_link_pose_to_sim(default_root_state[:, :7], env_ids)
        self.cartpole.write_root_com_velocity_to_sim(default_root_state[:, 7:], env_ids)
        self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)

Applying Actions#

There are two APIs that are designed for working with actions. The _pre_physics_step(self, actions) takes in actions from the policy as an argument and is called once per RL step, prior to taking any physics steps. This function can be used to process the actions buffer from the policy and cache the data in a class variable for the environment.

    def _pre_physics_step(self, actions: torch.Tensor) -> None:
        self.actions = self.action_scale * actions.clone()

The _apply_action(self) API is called decimation number of times for each RL step, prior to taking each physics step. This provides more flexibility for environments where actions should be applied for each physics step.

    def _apply_action(self) -> None:
        self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)

The Code Execution#

To run training for the direct workflow Cartpole environment, we can use the following command:

./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task=Isaac-Cartpole-Direct-v0
result of train.py

All direct workflow tasks have the suffix -Direct added to the task name to differentiate the implementation style.

Domain Randomization#

In the direct workflow, domain randomization configuration uses the configclass module to specify a configuration class consisting of EventTermCfg variables.

Below is an example of a configuration class for domain randomization:

@configclass
class EventCfg:
  robot_physics_material = EventTerm(
      func=mdp.randomize_rigid_body_material,
      mode="reset",
      params={
          "asset_cfg": SceneEntityCfg("robot", body_names=".*"),
          "static_friction_range": (0.7, 1.3),
          "dynamic_friction_range": (1.0, 1.0),
          "restitution_range": (1.0, 1.0),
          "num_buckets": 250,
      },
  )
  robot_joint_stiffness_and_damping = EventTerm(
      func=mdp.randomize_actuator_gains,
      mode="reset",
      params={
          "asset_cfg": SceneEntityCfg("robot", joint_names=".*"),
          "stiffness_distribution_params": (0.75, 1.5),
          "damping_distribution_params": (0.3, 3.0),
          "operation": "scale",
          "distribution": "log_uniform",
      },
  )
  reset_gravity = EventTerm(
      func=mdp.randomize_physics_scene_gravity,
      mode="interval",
      is_global_time=True,
      interval_range_s=(36.0, 36.0),  # time_s = num_steps * (decimation * dt)
      params={
          "gravity_distribution_params": ([0.0, 0.0, 0.0], [0.0, 0.0, 0.4]),
          "operation": "add",
          "distribution": "gaussian",
      },
  )

Each EventTerm object is of the EventTermCfg class and takes in a func parameter for specifying the function to call during randomization, a mode parameter, which can be startup, reset or interval. THe params dictionary should provide the necessary arguments to the function that is specified in the func parameter. Functions specified as func for the EventTerm can be found in the events module.

Note that as part of the "asset_cfg": SceneEntityCfg("robot", body_names=".*") parameter, the name of the actor "robot" is provided, along with the body or joint names specified as a regex expression, which will be the actors and bodies/joints that will have randomization applied.

Once the configclass for the randomization terms have been set up, the class must be added to the base config class for the task and be assigned to the variable events.

@configclass
class MyTaskConfig:
  events: EventCfg = EventCfg()

Action and Observation Noise#

Actions and observation noise can also be added using the configclass module. Action and observation noise configs must be added to the main task config using the action_noise_model and observation_noise_model variables:

@configclass
class MyTaskConfig:

    # at every time-step add gaussian noise + bias. The bias is a gaussian sampled at reset
    action_noise_model: NoiseModelWithAdditiveBiasCfg = NoiseModelWithAdditiveBiasCfg(
      noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.05, operation="add"),
      bias_noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.015, operation="abs"),
    )

    # at every time-step add gaussian noise + bias. The bias is a gaussian sampled at reset
    observation_noise_model: NoiseModelWithAdditiveBiasCfg = NoiseModelWithAdditiveBiasCfg(
      noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.002, operation="add"),
      bias_noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.0001, operation="abs"),
    )

NoiseModelWithAdditiveBiasCfg can be used to sample both uncorrelated noise per step as well as correlated noise that is re-sampled at reset time.

The noise_cfg term specifies the Gaussian distribution that will be sampled at each step for all environments. This noise will be added to the corresponding actions and observations buffers at every step.

The bias_noise_cfg term specifies the Gaussian distribution for the correlated noise that will be sampled at reset time for the environments being reset. The same noise will be applied each step for the remaining of the episode for the environments and resampled at the next reset.

If only per-step noise is desired, GaussianNoiseCfg can be used to specify an additive Gaussian distribution that adds the sampled noise to the input buffer.

@configclass
class MyTaskConfig:
  action_noise_model: GaussianNoiseCfg = GaussianNoiseCfg(mean=0.0, std=0.05, operation="add")

In this tutorial, we learnt how to create a direct workflow task environment for reinforcement learning. We do this by extending the base environment to include the scene setup, actions, dones, reset, reward and observaion functions.

While it is possible to manually create an instance of DirectRLEnv class for a desired task, this is not scalable as it requires specialized scripts for each task. Thus, we exploit the gymnasium.make() function to create the environment with the gym interface. We will learn how to do this in the next tutorial.