Creating a Manager-Based RL Environment#
Having learnt how to create a base environment in Creating a Manager-Based Base Environment, we will now look at how to create a manager-based task environment for reinforcement learning.
The base environment is designed as an sense-act environment where the agent can send commands to the environment
and receive observations from the environment. This minimal interface is sufficient for many applications such as
traditional motion planning and controls. However, many applications require a task-specification which often
serves as the learning objective for the agent. For instance, in a navigation task, the agent may be required to
reach a goal location. To this end, we use the envs.ManagerBasedRLEnv
class which extends the base environment
to include a task specification.
Similar to other components in Isaac Lab, instead of directly modifying the base class envs.ManagerBasedRLEnv
, we
encourage users to simply implement a configuration envs.ManagerBasedRLEnvCfg
for their task environment.
This practice allows us to separate the task specification from the environment implementation, making it easier
to reuse components of the same environment for different tasks.
In this tutorial, we will configure the cartpole environment using the envs.ManagerBasedRLEnvCfg
to create a manager-based task
for balancing the pole upright. We will learn how to specify the task using reward terms, termination criteria,
curriculum and commands.
The Code#
For this tutorial, we use the cartpole environment defined in omni.isaac.lab_tasks.manager_based.classic.cartpole
module.
Code for cartpole_env_cfg.py
1# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
2# All rights reserved.
3#
4# SPDX-License-Identifier: BSD-3-Clause
5
6import math
7
8import omni.isaac.lab.sim as sim_utils
9from omni.isaac.lab.assets import ArticulationCfg, AssetBaseCfg
10from omni.isaac.lab.envs import ManagerBasedRLEnvCfg
11from omni.isaac.lab.managers import EventTermCfg as EventTerm
12from omni.isaac.lab.managers import ObservationGroupCfg as ObsGroup
13from omni.isaac.lab.managers import ObservationTermCfg as ObsTerm
14from omni.isaac.lab.managers import RewardTermCfg as RewTerm
15from omni.isaac.lab.managers import SceneEntityCfg
16from omni.isaac.lab.managers import TerminationTermCfg as DoneTerm
17from omni.isaac.lab.scene import InteractiveSceneCfg
18from omni.isaac.lab.utils import configclass
19
20import omni.isaac.lab_tasks.manager_based.classic.cartpole.mdp as mdp
21
22##
23# Pre-defined configs
24##
25from omni.isaac.lab_assets.cartpole import CARTPOLE_CFG # isort:skip
26
27
28##
29# Scene definition
30##
31
32
33@configclass
34class CartpoleSceneCfg(InteractiveSceneCfg):
35 """Configuration for a cart-pole scene."""
36
37 # ground plane
38 ground = AssetBaseCfg(
39 prim_path="/World/ground",
40 spawn=sim_utils.GroundPlaneCfg(size=(100.0, 100.0)),
41 )
42
43 # cartpole
44 robot: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="{ENV_REGEX_NS}/Robot")
45
46 # lights
47 dome_light = AssetBaseCfg(
48 prim_path="/World/DomeLight",
49 spawn=sim_utils.DomeLightCfg(color=(0.9, 0.9, 0.9), intensity=500.0),
50 )
51
52
53##
54# MDP settings
55##
56
57
58@configclass
59class ActionsCfg:
60 """Action specifications for the MDP."""
61
62 joint_effort = mdp.JointEffortActionCfg(asset_name="robot", joint_names=["slider_to_cart"], scale=100.0)
63
64
65@configclass
66class ObservationsCfg:
67 """Observation specifications for the MDP."""
68
69 @configclass
70 class PolicyCfg(ObsGroup):
71 """Observations for policy group."""
72
73 # observation terms (order preserved)
74 joint_pos_rel = ObsTerm(func=mdp.joint_pos_rel)
75 joint_vel_rel = ObsTerm(func=mdp.joint_vel_rel)
76
77 def __post_init__(self) -> None:
78 self.enable_corruption = False
79 self.concatenate_terms = True
80
81 # observation groups
82 policy: PolicyCfg = PolicyCfg()
83
84
85@configclass
86class EventCfg:
87 """Configuration for events."""
88
89 # reset
90 reset_cart_position = EventTerm(
91 func=mdp.reset_joints_by_offset,
92 mode="reset",
93 params={
94 "asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]),
95 "position_range": (-1.0, 1.0),
96 "velocity_range": (-0.5, 0.5),
97 },
98 )
99
100 reset_pole_position = EventTerm(
101 func=mdp.reset_joints_by_offset,
102 mode="reset",
103 params={
104 "asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]),
105 "position_range": (-0.25 * math.pi, 0.25 * math.pi),
106 "velocity_range": (-0.25 * math.pi, 0.25 * math.pi),
107 },
108 )
109
110
111@configclass
112class RewardsCfg:
113 """Reward terms for the MDP."""
114
115 # (1) Constant running reward
116 alive = RewTerm(func=mdp.is_alive, weight=1.0)
117 # (2) Failure penalty
118 terminating = RewTerm(func=mdp.is_terminated, weight=-2.0)
119 # (3) Primary task: keep pole upright
120 pole_pos = RewTerm(
121 func=mdp.joint_pos_target_l2,
122 weight=-1.0,
123 params={"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]), "target": 0.0},
124 )
125 # (4) Shaping tasks: lower cart velocity
126 cart_vel = RewTerm(
127 func=mdp.joint_vel_l1,
128 weight=-0.01,
129 params={"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"])},
130 )
131 # (5) Shaping tasks: lower pole angular velocity
132 pole_vel = RewTerm(
133 func=mdp.joint_vel_l1,
134 weight=-0.005,
135 params={"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"])},
136 )
137
138
139@configclass
140class TerminationsCfg:
141 """Termination terms for the MDP."""
142
143 # (1) Time out
144 time_out = DoneTerm(func=mdp.time_out, time_out=True)
145 # (2) Cart out of bounds
146 cart_out_of_bounds = DoneTerm(
147 func=mdp.joint_pos_out_of_manual_limit,
148 params={"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]), "bounds": (-3.0, 3.0)},
149 )
150
151
152##
153# Environment configuration
154##
155
156
157@configclass
158class CartpoleEnvCfg(ManagerBasedRLEnvCfg):
159 """Configuration for the cartpole environment."""
160
161 # Scene settings
162 scene: CartpoleSceneCfg = CartpoleSceneCfg(num_envs=4096, env_spacing=4.0)
163 # Basic settings
164 observations: ObservationsCfg = ObservationsCfg()
165 actions: ActionsCfg = ActionsCfg()
166 events: EventCfg = EventCfg()
167 # MDP settings
168 rewards: RewardsCfg = RewardsCfg()
169 terminations: TerminationsCfg = TerminationsCfg()
170
171 # Post initialization
172 def __post_init__(self) -> None:
173 """Post initialization."""
174 # general settings
175 self.decimation = 2
176 self.episode_length_s = 5
177 # viewer settings
178 self.viewer.eye = (8.0, 0.0, 5.0)
179 # simulation settings
180 self.sim.dt = 1 / 120
181 self.sim.render_interval = self.decimation
The script for running the environment run_cartpole_rl_env.py
is present in the
isaaclab/source/standalone/tutorials/03_envs
directory. The script is similar to the
cartpole_base_env.py
script in the previous tutorial, except that it uses the
envs.ManagerBasedRLEnv
instead of the envs.ManagerBasedEnv
.
Code for run_cartpole_rl_env.py
1# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
2# All rights reserved.
3#
4# SPDX-License-Identifier: BSD-3-Clause
5
6"""This script demonstrates how to run the RL environment for the cartpole balancing task."""
7
8"""Launch Isaac Sim Simulator first."""
9
10import argparse
11
12from omni.isaac.lab.app import AppLauncher
13
14# add argparse arguments
15parser = argparse.ArgumentParser(description="Tutorial on running the cartpole RL environment.")
16parser.add_argument("--num_envs", type=int, default=16, help="Number of environments to spawn.")
17
18# append AppLauncher cli args
19AppLauncher.add_app_launcher_args(parser)
20# parse the arguments
21args_cli = parser.parse_args()
22
23# launch omniverse app
24app_launcher = AppLauncher(args_cli)
25simulation_app = app_launcher.app
26
27"""Rest everything follows."""
28
29import torch
30
31from omni.isaac.lab.envs import ManagerBasedRLEnv
32
33from omni.isaac.lab_tasks.manager_based.classic.cartpole.cartpole_env_cfg import CartpoleEnvCfg
34
35
36def main():
37 """Main function."""
38 # create environment configuration
39 env_cfg = CartpoleEnvCfg()
40 env_cfg.scene.num_envs = args_cli.num_envs
41 # setup RL environment
42 env = ManagerBasedRLEnv(cfg=env_cfg)
43
44 # simulate physics
45 count = 0
46 while simulation_app.is_running():
47 with torch.inference_mode():
48 # reset
49 if count % 300 == 0:
50 count = 0
51 env.reset()
52 print("-" * 80)
53 print("[INFO]: Resetting environment...")
54 # sample random actions
55 joint_efforts = torch.randn_like(env.action_manager.action)
56 # step the environment
57 obs, rew, terminated, truncated, info = env.step(joint_efforts)
58 # print current orientation of pole
59 print("[Env 0]: Pole joint: ", obs["policy"][0][1].item())
60 # update counter
61 count += 1
62
63 # close the environment
64 env.close()
65
66
67if __name__ == "__main__":
68 # run the main function
69 main()
70 # close sim app
71 simulation_app.close()
The Code Explained#
We already went through parts of the above in the Creating a Manager-Based Base Environment tutorial to learn about how to specify the scene, observations, actions and events. Thus, in this tutorial, we will focus only on the RL components of the environment.
In Isaac Lab, we provide various implementations of different terms in the envs.mdp
module. We will use
some of these terms in this tutorial, but users are free to define their own terms as well. These
are usually placed in their task-specific sub-package
(for instance, in omni.isaac.lab_tasks.manager_based.classic.cartpole.mdp
).
Defining rewards#
The managers.RewardManager
is used to compute the reward terms for the agent. Similar to the other
managers, its terms are configured using the managers.RewardTermCfg
class. The
managers.RewardTermCfg
class specifies the function or callable class that computes the reward
as well as the weighting associated with it. It also takes in dictionary of arguments, "params"
that are passed to the reward function when it is called.
For the cartpole task, we will use the following reward terms:
Alive Reward: Encourage the agent to stay alive for as long as possible.
Terminating Reward: Similarly penalize the agent for terminating.
Pole Angle Reward: Encourage the agent to keep the pole at the desired upright position.
Cart Velocity Reward: Encourage the agent to keep the cart velocity as small as possible.
Pole Velocity Reward: Encourage the agent to keep the pole velocity as small as possible.
@configclass
class RewardsCfg:
"""Reward terms for the MDP."""
# (1) Constant running reward
alive = RewTerm(func=mdp.is_alive, weight=1.0)
# (2) Failure penalty
terminating = RewTerm(func=mdp.is_terminated, weight=-2.0)
# (3) Primary task: keep pole upright
pole_pos = RewTerm(
func=mdp.joint_pos_target_l2,
weight=-1.0,
params={"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"]), "target": 0.0},
)
# (4) Shaping tasks: lower cart velocity
cart_vel = RewTerm(
func=mdp.joint_vel_l1,
weight=-0.01,
params={"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"])},
)
# (5) Shaping tasks: lower pole angular velocity
pole_vel = RewTerm(
func=mdp.joint_vel_l1,
weight=-0.005,
params={"asset_cfg": SceneEntityCfg("robot", joint_names=["cart_to_pole"])},
)
Defining termination criteria#
Most learning tasks happen over a finite number of steps that we call an episode. For instance, in the cartpole task, we want the agent to balance the pole for as long as possible. However, if the agent reaches an unstable or unsafe state, we want to terminate the episode. On the other hand, if the agent is able to balance the pole for a long time, we want to terminate the episode and start a new one so that the agent can learn to balance the pole from a different starting configuration.
The managers.TerminationsCfg
configures what constitutes for an episode to terminate. In this example,
we want the task to terminate when either of the following conditions is met:
Episode Length The episode length is greater than the defined max_episode_length
Cart out of bounds The cart goes outside of the bounds [-3, 3]
The flag managers.TerminationsCfg.time_out
specifies whether the term is a time-out (truncation) term
or terminated term. These are used to indicate the two types of terminations as described in Gymnasium’s documentation.
@configclass
class TerminationsCfg:
"""Termination terms for the MDP."""
# (1) Time out
time_out = DoneTerm(func=mdp.time_out, time_out=True)
# (2) Cart out of bounds
cart_out_of_bounds = DoneTerm(
func=mdp.joint_pos_out_of_manual_limit,
params={"asset_cfg": SceneEntityCfg("robot", joint_names=["slider_to_cart"]), "bounds": (-3.0, 3.0)},
)
Defining commands#
For various goal-conditioned tasks, it is useful to specify the goals or commands for the agent. These are
handled through the managers.CommandManager
. The command manager handles resampling and updating the
commands at each step. It can also be used to provide the commands as an observation to the agent.
For this simple task, we do not use any commands. Hence, we leave this attribute as its default value, which is None. You can see an example of how to define a command manager in the other locomotion or manipulation tasks.
Defining curriculum#
Often times when training a learning agent, it helps to start with a simple task and gradually increase the
tasks’s difficulty as the agent training progresses. This is the idea behind curriculum learning. In Isaac Lab,
we provide a managers.CurriculumManager
class that can be used to define a curriculum for your environment.
In this tutorial we don’t implement a curriculum for simplicity, but you can see an example of a curriculum definition in the other locomotion or manipulation tasks.
Tying it all together#
With all the above components defined, we can now create the ManagerBasedRLEnvCfg
configuration for the
cartpole environment. This is similar to the ManagerBasedEnvCfg
defined in Creating a Manager-Based Base Environment,
only with the added RL components explained in the above sections.
@configclass
class CartpoleEnvCfg(ManagerBasedRLEnvCfg):
"""Configuration for the cartpole environment."""
# Scene settings
scene: CartpoleSceneCfg = CartpoleSceneCfg(num_envs=4096, env_spacing=4.0)
# Basic settings
observations: ObservationsCfg = ObservationsCfg()
actions: ActionsCfg = ActionsCfg()
events: EventCfg = EventCfg()
# MDP settings
rewards: RewardsCfg = RewardsCfg()
terminations: TerminationsCfg = TerminationsCfg()
# Post initialization
def __post_init__(self) -> None:
"""Post initialization."""
# general settings
self.decimation = 2
self.episode_length_s = 5
# viewer settings
self.viewer.eye = (8.0, 0.0, 5.0)
# simulation settings
self.sim.dt = 1 / 120
self.sim.render_interval = self.decimation
Running the simulation loop#
Coming back to the run_cartpole_rl_env.py
script, the simulation loop is similar to the previous tutorial.
The only difference is that we create an instance of envs.ManagerBasedRLEnv
instead of the
envs.ManagerBasedEnv
. Consequently, now the envs.ManagerBasedRLEnv.step()
method returns additional signals
such as the reward and termination status. The information dictionary also maintains logging of quantities
such as the reward contribution from individual terms, the termination status of each term, the episode length etc.
def main():
"""Main function."""
# create environment configuration
env_cfg = CartpoleEnvCfg()
env_cfg.scene.num_envs = args_cli.num_envs
# setup RL environment
env = ManagerBasedRLEnv(cfg=env_cfg)
# simulate physics
count = 0
while simulation_app.is_running():
with torch.inference_mode():
# reset
if count % 300 == 0:
count = 0
env.reset()
print("-" * 80)
print("[INFO]: Resetting environment...")
# sample random actions
joint_efforts = torch.randn_like(env.action_manager.action)
# step the environment
obs, rew, terminated, truncated, info = env.step(joint_efforts)
# print current orientation of pole
print("[Env 0]: Pole joint: ", obs["policy"][0][1].item())
# update counter
count += 1
# close the environment
env.close()
The Code Execution#
Similar to the previous tutorial, we can run the environment by executing the run_cartpole_rl_env.py
script.
./isaaclab.sh -p source/standalone/tutorials/03_envs/run_cartpole_rl_env.py --num_envs 32
This should open a similar simulation as in the previous tutorial. However, this time, the environment returns more signals that specify the reward and termination status. Additionally, the individual environments reset themselves when they terminate based on the termination criteria specified in the configuration.
To stop the simulation, you can either close the window, or press Ctrl+C
in the terminal
where you started the simulation.
In this tutorial, we learnt how to create a task environment for reinforcement learning. We do this
by extending the base environment to include the rewards, terminations, commands and curriculum terms.
We also learnt how to use the envs.ManagerBasedRLEnv
class to run the environment and receive various
signals from it.
While it is possible to manually create an instance of envs.ManagerBasedRLEnv
class for a desired task,
this is not scalable as it requires specialized scripts for each task. Thus, we exploit the
gymnasium.make()
function to create the environment with the gym interface. We will learn how to do this
in the next tutorial.