# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Common functions that can be used to enable different events.
Events include anything related to altering the simulation state. This includes changing the physics
materials, applying external forces, and resetting the state of the asset.
The functions can be passed to the :class:`omni.isaac.lab.managers.EventTermCfg` object to enable
the event introduced by the function.
"""
from __future__ import annotations
import torch
from typing import TYPE_CHECKING, Literal
import carb
import omni.physics.tensors.impl.api as physx
import omni.isaac.lab.sim as sim_utils
import omni.isaac.lab.utils.math as math_utils
from omni.isaac.lab.actuators import ImplicitActuator
from omni.isaac.lab.assets import Articulation, DeformableObject, RigidObject
from omni.isaac.lab.managers import EventTermCfg, ManagerTermBase, SceneEntityCfg
from omni.isaac.lab.terrains import TerrainImporter
if TYPE_CHECKING:
from omni.isaac.lab.envs import ManagerBasedEnv
[docs]class randomize_rigid_body_material(ManagerTermBase):
"""Randomize the physics materials on all geometries of the asset.
This function creates a set of physics materials with random static friction, dynamic friction, and restitution
values. The number of materials is specified by ``num_buckets``. The materials are generated by sampling
uniform random values from the given ranges.
The material properties are then assigned to the geometries of the asset. The assignment is done by
creating a random integer tensor of shape (num_instances, max_num_shapes) where ``num_instances``
is the number of assets spawned and ``max_num_shapes`` is the maximum number of shapes in the asset (over
all bodies). The integer values are used as indices to select the material properties from the
material buckets.
If the flag ``make_consistent`` is set to ``True``, the dynamic friction is set to be less than or equal to
the static friction. This obeys the physics constraint on friction values. However, it may not always be
essential for the application. Thus, the flag is set to ``False`` by default.
.. attention::
This function uses CPU tensors to assign the material properties. It is recommended to use this function
only during the initialization of the environment. Otherwise, it may lead to a significant performance
overhead.
.. note::
PhysX only allows 64000 unique physics materials in the scene. If the number of materials exceeds this
limit, the simulation will crash. Due to this reason, we sample the materials only once during initialization.
Afterwards, these materials are randomly assigned to the geometries of the asset.
"""
[docs] def __init__(self, cfg: EventTermCfg, env: ManagerBasedEnv):
"""Initialize the term.
Args:
cfg: The configuration of the event term.
env: The environment instance.
Raises:
ValueError: If the asset is not a RigidObject or an Articulation.
"""
super().__init__(cfg, env)
# extract the used quantities (to enable type-hinting)
self.asset_cfg: SceneEntityCfg = cfg.params["asset_cfg"]
self.asset: RigidObject | Articulation = env.scene[self.asset_cfg.name]
if not isinstance(self.asset, (RigidObject, Articulation)):
raise ValueError(
f"Randomization term 'randomize_rigid_body_material' not supported for asset: '{self.asset_cfg.name}'"
f" with type: '{type(self.asset)}'."
)
# obtain number of shapes per body (needed for indexing the material properties correctly)
# note: this is a workaround since the Articulation does not provide a direct way to obtain the number of shapes
# per body. We use the physics simulation view to obtain the number of shapes per body.
if isinstance(self.asset, Articulation) and self.asset_cfg.body_ids != slice(None):
self.num_shapes_per_body = []
for link_path in self.asset.root_physx_view.link_paths[0]:
link_physx_view = self.asset._physics_sim_view.create_rigid_body_view(link_path) # type: ignore
self.num_shapes_per_body.append(link_physx_view.max_shapes)
# ensure the parsing is correct
num_shapes = sum(self.num_shapes_per_body)
expected_shapes = self.asset.root_physx_view.max_shapes
if num_shapes != expected_shapes:
raise ValueError(
"Randomization term 'randomize_rigid_body_material' failed to parse the number of shapes per body."
f" Expected total shapes: {expected_shapes}, but got: {num_shapes}."
)
else:
# in this case, we don't need to do special indexing
self.num_shapes_per_body = None
# obtain parameters for sampling friction and restitution values
static_friction_range = cfg.params.get("static_friction_range", (1.0, 1.0))
dynamic_friction_range = cfg.params.get("dynamic_friction_range", (1.0, 1.0))
restitution_range = cfg.params.get("restitution_range", (0.0, 0.0))
num_buckets = int(cfg.params.get("num_buckets", 1))
# sample material properties from the given ranges
# note: we only sample the materials once during initialization
# afterwards these are randomly assigned to the geometries of the asset
range_list = [static_friction_range, dynamic_friction_range, restitution_range]
ranges = torch.tensor(range_list, device="cpu")
self.material_buckets = math_utils.sample_uniform(ranges[:, 0], ranges[:, 1], (num_buckets, 3), device="cpu")
# ensure dynamic friction is always less than static friction
make_consistent = cfg.params.get("make_consistent", False)
if make_consistent:
self.material_buckets[:, 1] = torch.min(self.material_buckets[:, 0], self.material_buckets[:, 1])
def __call__(
self,
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
static_friction_range: tuple[float, float],
dynamic_friction_range: tuple[float, float],
restitution_range: tuple[float, float],
num_buckets: int,
asset_cfg: SceneEntityCfg,
make_consistent: bool = False,
):
# resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device="cpu")
else:
env_ids = env_ids.cpu()
# randomly assign material IDs to the geometries
total_num_shapes = self.asset.root_physx_view.max_shapes
bucket_ids = torch.randint(0, num_buckets, (len(env_ids), total_num_shapes), device="cpu")
material_samples = self.material_buckets[bucket_ids]
# retrieve material buffer from the physics simulation
materials = self.asset.root_physx_view.get_material_properties()
# update material buffer with new samples
if self.num_shapes_per_body is not None:
# sample material properties from the given ranges
for body_id in self.asset_cfg.body_ids:
# obtain indices of shapes for the body
start_idx = sum(self.num_shapes_per_body[:body_id])
end_idx = start_idx + self.num_shapes_per_body[body_id]
# assign the new materials
# material samples are of shape: num_env_ids x total_num_shapes x 3
materials[env_ids, start_idx:end_idx] = material_samples[:, start_idx:end_idx]
else:
# assign all the materials
materials[env_ids] = material_samples[:]
# apply to simulation
self.asset.root_physx_view.set_material_properties(materials, env_ids)
[docs]def randomize_rigid_body_mass(
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
asset_cfg: SceneEntityCfg,
mass_distribution_params: tuple[float, float],
operation: Literal["add", "scale", "abs"],
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
recompute_inertia: bool = True,
):
"""Randomize the mass of the bodies by adding, scaling, or setting random values.
This function allows randomizing the mass of the bodies of the asset. The function samples random values from the
given distribution parameters and adds, scales, or sets the values into the physics simulation based on the operation.
If the ``recompute_inertia`` flag is set to ``True``, the function recomputes the inertia tensor of the bodies
after setting the mass. This is useful when the mass is changed significantly, as the inertia tensor depends
on the mass. It assumes the body is a uniform density object. If the body is not a uniform density object,
the inertia tensor may not be accurate.
.. tip::
This function uses CPU tensors to assign the body masses. It is recommended to use this function
only during the initialization of the environment.
"""
# extract the used quantities (to enable type-hinting)
asset: RigidObject | Articulation = env.scene[asset_cfg.name]
# resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device="cpu")
else:
env_ids = env_ids.cpu()
# resolve body indices
if asset_cfg.body_ids == slice(None):
body_ids = torch.arange(asset.num_bodies, dtype=torch.int, device="cpu")
else:
body_ids = torch.tensor(asset_cfg.body_ids, dtype=torch.int, device="cpu")
# get the current masses of the bodies (num_assets, num_bodies)
masses = asset.root_physx_view.get_masses()
# apply randomization on default values
# this is to make sure when calling the function multiple times, the randomization is applied on the
# default values and not the previously randomized values
masses[env_ids[:, None], body_ids] = asset.data.default_mass[env_ids[:, None], body_ids].clone()
# sample from the given range
# note: we modify the masses in-place for all environments
# however, the setter takes care that only the masses of the specified environments are modified
masses = _randomize_prop_by_op(
masses, mass_distribution_params, env_ids, body_ids, operation=operation, distribution=distribution
)
# set the mass into the physics simulation
asset.root_physx_view.set_masses(masses, env_ids)
# recompute inertia tensors if needed
if recompute_inertia:
# compute the ratios of the new masses to the initial masses
ratios = masses[env_ids[:, None], body_ids] / asset.data.default_mass[env_ids[:, None], body_ids]
# scale the inertia tensors by the the ratios
# since mass randomization is done on default values, we can use the default inertia tensors
inertias = asset.root_physx_view.get_inertias()
if isinstance(asset, Articulation):
# inertia has shape: (num_envs, num_bodies, 9) for articulation
inertias[env_ids[:, None], body_ids] = (
asset.data.default_inertia[env_ids[:, None], body_ids] * ratios[..., None]
)
else:
# inertia has shape: (num_envs, 9) for rigid object
inertias[env_ids] = asset.data.default_inertia[env_ids] * ratios
# set the inertia tensors into the physics simulation
asset.root_physx_view.set_inertias(inertias, env_ids)
[docs]def randomize_physics_scene_gravity(
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
gravity_distribution_params: tuple[list[float], list[float]],
operation: Literal["add", "scale", "abs"],
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
):
"""Randomize gravity by adding, scaling, or setting random values.
This function allows randomizing gravity of the physics scene. The function samples random values from the
given distribution parameters and adds, scales, or sets the values into the physics simulation based on the
operation.
The distribution parameters are lists of two elements each, representing the lower and upper bounds of the
distribution for the x, y, and z components of the gravity vector. The function samples random values for each
component independently.
.. attention::
This function applied the same gravity for all the environments.
.. tip::
This function uses CPU tensors to assign gravity.
"""
# get the current gravity
gravity = torch.tensor(env.sim.cfg.gravity, device="cpu").unsqueeze(0)
dist_param_0 = torch.tensor(gravity_distribution_params[0], device="cpu")
dist_param_1 = torch.tensor(gravity_distribution_params[1], device="cpu")
gravity = _randomize_prop_by_op(
gravity,
(dist_param_0, dist_param_1),
None,
slice(None),
operation=operation,
distribution=distribution,
)
# unbatch the gravity tensor into a list
gravity = gravity[0].tolist()
# set the gravity into the physics simulation
physics_sim_view: physx.SimulationView = sim_utils.SimulationContext.instance().physics_sim_view
physics_sim_view.set_gravity(carb.Float3(*gravity))
[docs]def randomize_actuator_gains(
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
asset_cfg: SceneEntityCfg,
stiffness_distribution_params: tuple[float, float] | None = None,
damping_distribution_params: tuple[float, float] | None = None,
operation: Literal["add", "scale", "abs"] = "abs",
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
):
"""Randomize the actuator gains in an articulation by adding, scaling, or setting random values.
This function allows randomizing the actuator stiffness and damping gains.
The function samples random values from the given distribution parameters and applies the operation to the joint properties.
It then sets the values into the actuator models. If the distribution parameters are not provided for a particular property,
the function does not modify the property.
.. tip::
For implicit actuators, this function uses CPU tensors to assign the actuator gains into the simulation.
In such cases, it is recommended to use this function only during the initialization of the environment.
"""
# Extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# Resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device=asset.device)
def randomize(data: torch.Tensor, params: tuple[float, float]) -> torch.Tensor:
return _randomize_prop_by_op(
data, params, dim_0_ids=None, dim_1_ids=actuator_indices, operation=operation, distribution=distribution
)
# Loop through actuators and randomize gains
for actuator in asset.actuators.values():
if isinstance(asset_cfg.joint_ids, slice):
# we take all the joints of the actuator
actuator_indices = slice(None)
if isinstance(actuator.joint_indices, slice):
global_indices = slice(None)
else:
global_indices = torch.tensor(actuator.joint_indices, device=asset.device)
elif isinstance(actuator.joint_indices, slice):
# we take the joints defined in the asset config
global_indices = actuator_indices = torch.tensor(asset_cfg.joint_ids, device=asset.device)
else:
# we take the intersection of the actuator joints and the asset config joints
actuator_joint_indices = torch.tensor(actuator.joint_indices, device=asset.device)
asset_joint_ids = torch.tensor(asset_cfg.joint_ids, device=asset.device)
# the indices of the joints in the actuator that have to be randomized
actuator_indices = torch.nonzero(torch.isin(actuator_joint_indices, asset_joint_ids)).view(-1)
if len(actuator_indices) == 0:
continue
# maps actuator indices that have to be randomized to global joint indices
global_indices = actuator_joint_indices[actuator_indices]
# Randomize stiffness
if stiffness_distribution_params is not None:
stiffness = actuator.stiffness[env_ids].clone()
stiffness[:, actuator_indices] = asset.data.default_joint_stiffness[env_ids][:, global_indices].clone()
randomize(stiffness, stiffness_distribution_params)
actuator.stiffness[env_ids] = stiffness
if isinstance(actuator, ImplicitActuator):
asset.write_joint_stiffness_to_sim(stiffness, joint_ids=actuator.joint_indices, env_ids=env_ids)
# Randomize damping
if damping_distribution_params is not None:
damping = actuator.damping[env_ids].clone()
damping[:, actuator_indices] = asset.data.default_joint_damping[env_ids][:, global_indices].clone()
randomize(damping, damping_distribution_params)
actuator.damping[env_ids] = damping
if isinstance(actuator, ImplicitActuator):
asset.write_joint_damping_to_sim(damping, joint_ids=actuator.joint_indices, env_ids=env_ids)
[docs]def randomize_joint_parameters(
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
asset_cfg: SceneEntityCfg,
friction_distribution_params: tuple[float, float] | None = None,
armature_distribution_params: tuple[float, float] | None = None,
lower_limit_distribution_params: tuple[float, float] | None = None,
upper_limit_distribution_params: tuple[float, float] | None = None,
operation: Literal["add", "scale", "abs"] = "abs",
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
):
"""Randomize the joint parameters of an articulation by adding, scaling, or setting random values.
This function allows randomizing the joint parameters of the asset.
These correspond to the physics engine joint properties that affect the joint behavior.
The function samples random values from the given distribution parameters and applies the operation to the joint properties.
It then sets the values into the physics simulation. If the distribution parameters are not provided for a
particular property, the function does not modify the property.
.. tip::
This function uses CPU tensors to assign the joint properties. It is recommended to use this function
only during the initialization of the environment.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device=asset.device)
# resolve joint indices
if asset_cfg.joint_ids == slice(None):
joint_ids = slice(None) # for optimization purposes
else:
joint_ids = torch.tensor(asset_cfg.joint_ids, dtype=torch.int, device=asset.device)
# sample joint properties from the given ranges and set into the physics simulation
# -- friction
if friction_distribution_params is not None:
friction = asset.data.default_joint_friction.to(asset.device).clone()
friction = _randomize_prop_by_op(
friction, friction_distribution_params, env_ids, joint_ids, operation=operation, distribution=distribution
)[env_ids][:, joint_ids]
asset.write_joint_friction_to_sim(friction, joint_ids=joint_ids, env_ids=env_ids)
# -- armature
if armature_distribution_params is not None:
armature = asset.data.default_joint_armature.to(asset.device).clone()
armature = _randomize_prop_by_op(
armature, armature_distribution_params, env_ids, joint_ids, operation=operation, distribution=distribution
)[env_ids][:, joint_ids]
asset.write_joint_armature_to_sim(armature, joint_ids=joint_ids, env_ids=env_ids)
# -- dof limits
if lower_limit_distribution_params is not None or upper_limit_distribution_params is not None:
dof_limits = asset.data.default_joint_limits.to(asset.device).clone()
if lower_limit_distribution_params is not None:
lower_limits = dof_limits[..., 0]
lower_limits = _randomize_prop_by_op(
lower_limits,
lower_limit_distribution_params,
env_ids,
joint_ids,
operation=operation,
distribution=distribution,
)[env_ids][:, joint_ids]
dof_limits[env_ids[:, None], joint_ids, 0] = lower_limits
if upper_limit_distribution_params is not None:
upper_limits = dof_limits[..., 1]
upper_limits = _randomize_prop_by_op(
upper_limits,
upper_limit_distribution_params,
env_ids,
joint_ids,
operation=operation,
distribution=distribution,
)[env_ids][:, joint_ids]
dof_limits[env_ids[:, None], joint_ids, 1] = upper_limits
if (dof_limits[env_ids[:, None], joint_ids, 0] > dof_limits[env_ids[:, None], joint_ids, 1]).any():
raise ValueError(
"Randomization term 'randomize_joint_parameters' is setting lower joint limits that are greater than"
" upper joint limits."
)
asset.write_joint_limits_to_sim(dof_limits[env_ids][:, joint_ids], joint_ids=joint_ids, env_ids=env_ids)
[docs]def randomize_fixed_tendon_parameters(
env: ManagerBasedEnv,
env_ids: torch.Tensor | None,
asset_cfg: SceneEntityCfg,
stiffness_distribution_params: tuple[float, float] | None = None,
damping_distribution_params: tuple[float, float] | None = None,
limit_stiffness_distribution_params: tuple[float, float] | None = None,
lower_limit_distribution_params: tuple[float, float] | None = None,
upper_limit_distribution_params: tuple[float, float] | None = None,
rest_length_distribution_params: tuple[float, float] | None = None,
offset_distribution_params: tuple[float, float] | None = None,
operation: Literal["add", "scale", "abs"] = "abs",
distribution: Literal["uniform", "log_uniform", "gaussian"] = "uniform",
):
"""Randomize the fixed tendon parameters of an articulation by adding, scaling, or setting random values.
This function allows randomizing the fixed tendon parameters of the asset.
These correspond to the physics engine tendon properties that affect the joint behavior.
The function samples random values from the given distribution parameters and applies the operation to the tendon properties.
It then sets the values into the physics simulation. If the distribution parameters are not provided for a
particular property, the function does not modify the property.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device=asset.device)
# resolve joint indices
if asset_cfg.fixed_tendon_ids == slice(None):
fixed_tendon_ids = slice(None) # for optimization purposes
else:
fixed_tendon_ids = torch.tensor(asset_cfg.fixed_tendon_ids, dtype=torch.int, device=asset.device)
# sample tendon properties from the given ranges and set into the physics simulation
# -- stiffness
if stiffness_distribution_params is not None:
stiffness = asset.data.default_fixed_tendon_stiffness.clone()
stiffness = _randomize_prop_by_op(
stiffness,
stiffness_distribution_params,
env_ids,
fixed_tendon_ids,
operation=operation,
distribution=distribution,
)[env_ids][:, fixed_tendon_ids]
asset.set_fixed_tendon_stiffness(stiffness, fixed_tendon_ids, env_ids)
# -- damping
if damping_distribution_params is not None:
damping = asset.data.default_fixed_tendon_damping.clone()
damping = _randomize_prop_by_op(
damping,
damping_distribution_params,
env_ids,
fixed_tendon_ids,
operation=operation,
distribution=distribution,
)[env_ids][:, fixed_tendon_ids]
asset.set_fixed_tendon_damping(damping, fixed_tendon_ids, env_ids)
# -- limit stiffness
if limit_stiffness_distribution_params is not None:
limit_stiffness = asset.data.default_fixed_tendon_limit_stiffness.clone()
limit_stiffness = _randomize_prop_by_op(
limit_stiffness,
limit_stiffness_distribution_params,
env_ids,
fixed_tendon_ids,
operation=operation,
distribution=distribution,
)[env_ids][:, fixed_tendon_ids]
asset.set_fixed_tendon_limit_stiffness(limit_stiffness, fixed_tendon_ids, env_ids)
# -- limits
if lower_limit_distribution_params is not None or upper_limit_distribution_params is not None:
limit = asset.data.default_fixed_tendon_limit.clone()
# -- lower limit
if lower_limit_distribution_params is not None:
lower_limit = limit[..., 0]
lower_limit = _randomize_prop_by_op(
lower_limit,
lower_limit_distribution_params,
env_ids,
fixed_tendon_ids,
operation=operation,
distribution=distribution,
)[env_ids][:, fixed_tendon_ids]
limit[env_ids[:, None], fixed_tendon_ids, 0] = lower_limit
# -- upper limit
if upper_limit_distribution_params is not None:
upper_limit = limit[..., 1]
upper_limit = _randomize_prop_by_op(
upper_limit,
upper_limit_distribution_params,
env_ids,
fixed_tendon_ids,
operation=operation,
distribution=distribution,
)[env_ids][:, fixed_tendon_ids]
limit[env_ids[:, None], fixed_tendon_ids, 1] = upper_limit
if (limit[env_ids[:, None], fixed_tendon_ids, 0] > limit[env_ids[:, None], fixed_tendon_ids, 1]).any():
raise ValueError(
"Randomization term 'randomize_fixed_tendon_parameters' is setting lower tendon limits that are greater"
" than upper tendon limits."
)
asset.set_fixed_tendon_limit(limit, fixed_tendon_ids, env_ids)
# -- rest length
if rest_length_distribution_params is not None:
rest_length = asset.data.default_fixed_tendon_rest_length.clone()
rest_length = _randomize_prop_by_op(
rest_length,
rest_length_distribution_params,
env_ids,
fixed_tendon_ids,
operation=operation,
distribution=distribution,
)[env_ids][:, fixed_tendon_ids]
asset.set_fixed_tendon_rest_length(rest_length, fixed_tendon_ids, env_ids)
# -- offset
if offset_distribution_params is not None:
offset = asset.data.default_fixed_tendon_offset.clone()
offset = _randomize_prop_by_op(
offset,
offset_distribution_params,
env_ids,
fixed_tendon_ids,
operation=operation,
distribution=distribution,
)[env_ids][:, fixed_tendon_ids]
asset.set_fixed_tendon_offset(offset, fixed_tendon_ids, env_ids)
asset.write_fixed_tendon_properties_to_sim(fixed_tendon_ids, env_ids)
[docs]def apply_external_force_torque(
env: ManagerBasedEnv,
env_ids: torch.Tensor,
force_range: tuple[float, float],
torque_range: tuple[float, float],
asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
):
"""Randomize the external forces and torques applied to the bodies.
This function creates a set of random forces and torques sampled from the given ranges. The number of forces
and torques is equal to the number of bodies times the number of environments. The forces and torques are
applied to the bodies by calling ``asset.set_external_force_and_torque``. The forces and torques are only
applied when ``asset.write_data_to_sim()`` is called in the environment.
"""
# extract the used quantities (to enable type-hinting)
asset: RigidObject | Articulation = env.scene[asset_cfg.name]
# resolve environment ids
if env_ids is None:
env_ids = torch.arange(env.scene.num_envs, device=asset.device)
# resolve number of bodies
num_bodies = len(asset_cfg.body_ids) if isinstance(asset_cfg.body_ids, list) else asset.num_bodies
# sample random forces and torques
size = (len(env_ids), num_bodies, 3)
forces = math_utils.sample_uniform(*force_range, size, asset.device)
torques = math_utils.sample_uniform(*torque_range, size, asset.device)
# set the forces and torques into the buffers
# note: these are only applied when you call: `asset.write_data_to_sim()`
asset.set_external_force_and_torque(forces, torques, env_ids=env_ids, body_ids=asset_cfg.body_ids)
[docs]def push_by_setting_velocity(
env: ManagerBasedEnv,
env_ids: torch.Tensor,
velocity_range: dict[str, tuple[float, float]],
asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
):
"""Push the asset by setting the root velocity to a random value within the given ranges.
This creates an effect similar to pushing the asset with a random impulse that changes the asset's velocity.
It samples the root velocity from the given ranges and sets the velocity into the physics simulation.
The function takes a dictionary of velocity ranges for each axis and rotation. The keys of the dictionary
are ``x``, ``y``, ``z``, ``roll``, ``pitch``, and ``yaw``. The values are tuples of the form ``(min, max)``.
If the dictionary does not contain a key, the velocity is set to zero for that axis.
"""
# extract the used quantities (to enable type-hinting)
asset: RigidObject | Articulation = env.scene[asset_cfg.name]
# velocities
vel_w = asset.data.root_com_vel_w[env_ids]
# sample random velocities
range_list = [velocity_range.get(key, (0.0, 0.0)) for key in ["x", "y", "z", "roll", "pitch", "yaw"]]
ranges = torch.tensor(range_list, device=asset.device)
vel_w[:] = math_utils.sample_uniform(ranges[:, 0], ranges[:, 1], vel_w.shape, device=asset.device)
# set the velocities into the physics simulation
asset.write_root_com_velocity_to_sim(vel_w, env_ids=env_ids)
[docs]def reset_root_state_with_random_orientation(
env: ManagerBasedEnv,
env_ids: torch.Tensor,
pose_range: dict[str, tuple[float, float]],
velocity_range: dict[str, tuple[float, float]],
asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
):
"""Reset the asset root position and velocities sampled randomly within the given ranges
and the asset root orientation sampled randomly from the SO(3).
This function randomizes the root position and velocity of the asset.
* It samples the root position from the given ranges and adds them to the default root position, before setting
them into the physics simulation.
* It samples the root orientation uniformly from the SO(3) and sets them into the physics simulation.
* It samples the root velocity from the given ranges and sets them into the physics simulation.
The function takes a dictionary of position and velocity ranges for each axis and rotation:
* :attr:`pose_range` - a dictionary of position ranges for each axis. The keys of the dictionary are ``x``,
``y``, and ``z``. The orientation is sampled uniformly from the SO(3).
* :attr:`velocity_range` - a dictionary of velocity ranges for each axis and rotation. The keys of the dictionary
are ``x``, ``y``, ``z``, ``roll``, ``pitch``, and ``yaw``.
The values are tuples of the form ``(min, max)``. If the dictionary does not contain a particular key,
the position is set to zero for that axis.
"""
# extract the used quantities (to enable type-hinting)
asset: RigidObject | Articulation = env.scene[asset_cfg.name]
# get default root state
root_states = asset.data.default_root_state[env_ids].clone()
# poses
range_list = [pose_range.get(key, (0.0, 0.0)) for key in ["x", "y", "z"]]
ranges = torch.tensor(range_list, device=asset.device)
rand_samples = math_utils.sample_uniform(ranges[:, 0], ranges[:, 1], (len(env_ids), 3), device=asset.device)
positions = root_states[:, 0:3] + env.scene.env_origins[env_ids] + rand_samples
orientations = math_utils.random_orientation(len(env_ids), device=asset.device)
# velocities
range_list = [velocity_range.get(key, (0.0, 0.0)) for key in ["x", "y", "z", "roll", "pitch", "yaw"]]
ranges = torch.tensor(range_list, device=asset.device)
rand_samples = math_utils.sample_uniform(ranges[:, 0], ranges[:, 1], (len(env_ids), 6), device=asset.device)
velocities = root_states[:, 7:13] + rand_samples
# set into the physics simulation
asset.write_root_link_pose_to_sim(torch.cat([positions, orientations], dim=-1), env_ids=env_ids)
asset.write_root_com_velocity_to_sim(velocities, env_ids=env_ids)
[docs]def reset_root_state_from_terrain(
env: ManagerBasedEnv,
env_ids: torch.Tensor,
pose_range: dict[str, tuple[float, float]],
velocity_range: dict[str, tuple[float, float]],
asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
):
"""Reset the asset root state by sampling a random valid pose from the terrain.
This function samples a random valid pose(based on flat patches) from the terrain and sets the root state
of the asset to this position. The function also samples random velocities from the given ranges and sets them
into the physics simulation.
The function takes a dictionary of position and velocity ranges for each axis and rotation:
* :attr:`pose_range` - a dictionary of pose ranges for each axis. The keys of the dictionary are ``roll``,
``pitch``, and ``yaw``. The position is sampled from the flat patches of the terrain.
* :attr:`velocity_range` - a dictionary of velocity ranges for each axis and rotation. The keys of the dictionary
are ``x``, ``y``, ``z``, ``roll``, ``pitch``, and ``yaw``.
The values are tuples of the form ``(min, max)``. If the dictionary does not contain a particular key,
the position is set to zero for that axis.
Note:
The function expects the terrain to have valid flat patches under the key "init_pos". The flat patches
are used to sample the random pose for the robot.
Raises:
ValueError: If the terrain does not have valid flat patches under the key "init_pos".
"""
# access the used quantities (to enable type-hinting)
asset: RigidObject | Articulation = env.scene[asset_cfg.name]
terrain: TerrainImporter = env.scene.terrain
# obtain all flat patches corresponding to the valid poses
valid_positions: torch.Tensor = terrain.flat_patches.get("init_pos")
if valid_positions is None:
raise ValueError(
"The event term 'reset_root_state_from_terrain' requires valid flat patches under 'init_pos'."
f" Found: {list(terrain.flat_patches.keys())}"
)
# sample random valid poses
ids = torch.randint(0, valid_positions.shape[2], size=(len(env_ids),), device=env.device)
positions = valid_positions[terrain.terrain_levels[env_ids], terrain.terrain_types[env_ids], ids]
positions += asset.data.default_root_state[env_ids, :3]
# sample random orientations
range_list = [pose_range.get(key, (0.0, 0.0)) for key in ["roll", "pitch", "yaw"]]
ranges = torch.tensor(range_list, device=asset.device)
rand_samples = math_utils.sample_uniform(ranges[:, 0], ranges[:, 1], (len(env_ids), 3), device=asset.device)
# convert to quaternions
orientations = math_utils.quat_from_euler_xyz(rand_samples[:, 0], rand_samples[:, 1], rand_samples[:, 2])
# sample random velocities
range_list = [velocity_range.get(key, (0.0, 0.0)) for key in ["x", "y", "z", "roll", "pitch", "yaw"]]
ranges = torch.tensor(range_list, device=asset.device)
rand_samples = math_utils.sample_uniform(ranges[:, 0], ranges[:, 1], (len(env_ids), 6), device=asset.device)
velocities = asset.data.default_root_state[:, 7:13] + rand_samples
# set into the physics simulation
asset.write_root_link_pose_to_sim(torch.cat([positions, orientations], dim=-1), env_ids=env_ids)
asset.write_root_com_velocity_to_sim(velocities, env_ids=env_ids)
[docs]def reset_joints_by_scale(
env: ManagerBasedEnv,
env_ids: torch.Tensor,
position_range: tuple[float, float],
velocity_range: tuple[float, float],
asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
):
"""Reset the robot joints by scaling the default position and velocity by the given ranges.
This function samples random values from the given ranges and scales the default joint positions and velocities
by these values. The scaled values are then set into the physics simulation.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# get default joint state
joint_pos = asset.data.default_joint_pos[env_ids].clone()
joint_vel = asset.data.default_joint_vel[env_ids].clone()
# scale these values randomly
joint_pos *= math_utils.sample_uniform(*position_range, joint_pos.shape, joint_pos.device)
joint_vel *= math_utils.sample_uniform(*velocity_range, joint_vel.shape, joint_vel.device)
# clamp joint pos to limits
joint_pos_limits = asset.data.soft_joint_pos_limits[env_ids]
joint_pos = joint_pos.clamp_(joint_pos_limits[..., 0], joint_pos_limits[..., 1])
# clamp joint vel to limits
joint_vel_limits = asset.data.soft_joint_vel_limits[env_ids]
joint_vel = joint_vel.clamp_(-joint_vel_limits, joint_vel_limits)
# set into the physics simulation
asset.write_joint_state_to_sim(joint_pos, joint_vel, env_ids=env_ids)
[docs]def reset_joints_by_offset(
env: ManagerBasedEnv,
env_ids: torch.Tensor,
position_range: tuple[float, float],
velocity_range: tuple[float, float],
asset_cfg: SceneEntityCfg = SceneEntityCfg("robot"),
):
"""Reset the robot joints with offsets around the default position and velocity by the given ranges.
This function samples random values from the given ranges and biases the default joint positions and velocities
by these values. The biased values are then set into the physics simulation.
"""
# extract the used quantities (to enable type-hinting)
asset: Articulation = env.scene[asset_cfg.name]
# get default joint state
joint_pos = asset.data.default_joint_pos[env_ids].clone()
joint_vel = asset.data.default_joint_vel[env_ids].clone()
# bias these values randomly
joint_pos += math_utils.sample_uniform(*position_range, joint_pos.shape, joint_pos.device)
joint_vel += math_utils.sample_uniform(*velocity_range, joint_vel.shape, joint_vel.device)
# clamp joint pos to limits
joint_pos_limits = asset.data.soft_joint_pos_limits[env_ids]
joint_pos = joint_pos.clamp_(joint_pos_limits[..., 0], joint_pos_limits[..., 1])
# clamp joint vel to limits
joint_vel_limits = asset.data.soft_joint_vel_limits[env_ids]
joint_vel = joint_vel.clamp_(-joint_vel_limits, joint_vel_limits)
# set into the physics simulation
asset.write_joint_state_to_sim(joint_pos, joint_vel, env_ids=env_ids)
[docs]def reset_scene_to_default(env: ManagerBasedEnv, env_ids: torch.Tensor):
"""Reset the scene to the default state specified in the scene configuration."""
# rigid bodies
for rigid_object in env.scene.rigid_objects.values():
# obtain default and deal with the offset for env origins
default_root_state = rigid_object.data.default_root_state[env_ids].clone()
default_root_state[:, 0:3] += env.scene.env_origins[env_ids]
# set into the physics simulation
rigid_object.write_root_link_pose_to_sim(default_root_state[:, :7], env_ids=env_ids)
rigid_object.write_root_com_velocity_to_sim(default_root_state[:, 7:], env_ids=env_ids)
# articulations
for articulation_asset in env.scene.articulations.values():
# obtain default and deal with the offset for env origins
default_root_state = articulation_asset.data.default_root_state[env_ids].clone()
default_root_state[:, 0:3] += env.scene.env_origins[env_ids]
# set into the physics simulation
articulation_asset.write_root_link_pose_to_sim(default_root_state[:, :7], env_ids=env_ids)
articulation_asset.write_root_com_velocity_to_sim(default_root_state[:, 7:], env_ids=env_ids)
# obtain default joint positions
default_joint_pos = articulation_asset.data.default_joint_pos[env_ids].clone()
default_joint_vel = articulation_asset.data.default_joint_vel[env_ids].clone()
# set into the physics simulation
articulation_asset.write_joint_state_to_sim(default_joint_pos, default_joint_vel, env_ids=env_ids)
# deformable objects
for deformable_object in env.scene.deformable_objects.values():
# obtain default and set into the physics simulation
nodal_state = deformable_object.data.default_nodal_state_w[env_ids].clone()
deformable_object.write_nodal_state_to_sim(nodal_state, env_ids=env_ids)
"""
Internal helper functions.
"""
def _randomize_prop_by_op(
data: torch.Tensor,
distribution_parameters: tuple[float | torch.Tensor, float | torch.Tensor],
dim_0_ids: torch.Tensor | None,
dim_1_ids: torch.Tensor | slice,
operation: Literal["add", "scale", "abs"],
distribution: Literal["uniform", "log_uniform", "gaussian"],
) -> torch.Tensor:
"""Perform data randomization based on the given operation and distribution.
Args:
data: The data tensor to be randomized. Shape is (dim_0, dim_1).
distribution_parameters: The parameters for the distribution to sample values from.
dim_0_ids: The indices of the first dimension to randomize.
dim_1_ids: The indices of the second dimension to randomize.
operation: The operation to perform on the data. Options: 'add', 'scale', 'abs'.
distribution: The distribution to sample the random values from. Options: 'uniform', 'log_uniform'.
Returns:
The data tensor after randomization. Shape is (dim_0, dim_1).
Raises:
NotImplementedError: If the operation or distribution is not supported.
"""
# resolve shape
# -- dim 0
if dim_0_ids is None:
n_dim_0 = data.shape[0]
dim_0_ids = slice(None)
else:
n_dim_0 = len(dim_0_ids)
if not isinstance(dim_1_ids, slice):
dim_0_ids = dim_0_ids[:, None]
# -- dim 1
if isinstance(dim_1_ids, slice):
n_dim_1 = data.shape[1]
else:
n_dim_1 = len(dim_1_ids)
# resolve the distribution
if distribution == "uniform":
dist_fn = math_utils.sample_uniform
elif distribution == "log_uniform":
dist_fn = math_utils.sample_log_uniform
elif distribution == "gaussian":
dist_fn = math_utils.sample_gaussian
else:
raise NotImplementedError(
f"Unknown distribution: '{distribution}' for joint properties randomization."
" Please use 'uniform', 'log_uniform', 'gaussian'."
)
# perform the operation
if operation == "add":
data[dim_0_ids, dim_1_ids] += dist_fn(*distribution_parameters, (n_dim_0, n_dim_1), device=data.device)
elif operation == "scale":
data[dim_0_ids, dim_1_ids] *= dist_fn(*distribution_parameters, (n_dim_0, n_dim_1), device=data.device)
elif operation == "abs":
data[dim_0_ids, dim_1_ids] = dist_fn(*distribution_parameters, (n_dim_0, n_dim_1), device=data.device)
else:
raise NotImplementedError(
f"Unknown operation: '{operation}' for property randomization. Please use 'add', 'scale', or 'abs'."
)
return data