# Copyright (c) 2022-2025, The Isaac Lab Project Developers.# All rights reserved.## SPDX-License-Identifier: BSD-3-Clause"""Common functions that can be used to enable different events.Events include anything related to altering the simulation state. This includes changing the physicsmaterials, applying external forces, and resetting the state of the asset.The functions can be passed to the :class:`isaaclab.managers.EventTermCfg` object to enablethe event introduced by the function."""from__future__importannotationsimportmathimporttorchfromtypingimportTYPE_CHECKING,Literalimportcarbimportomni.physics.tensors.impl.apiasphysximportomni.usdfromisaacsim.core.utils.extensionsimportenable_extensionfrompxrimportGf,Sdf,UsdGeom,Vtimportisaaclab.simassim_utilsimportisaaclab.utils.mathasmath_utilsfromisaaclab.actuatorsimportImplicitActuatorfromisaaclab.assetsimportArticulation,DeformableObject,RigidObjectfromisaaclab.managersimportEventTermCfg,ManagerTermBase,SceneEntityCfgfromisaaclab.terrainsimportTerrainImporterifTYPE_CHECKING:fromisaaclab.envsimportManagerBasedEnv
[docs]defrandomize_rigid_body_scale(env:ManagerBasedEnv,env_ids:torch.Tensor|None,scale_range:tuple[float,float]|dict[str,tuple[float,float]],asset_cfg:SceneEntityCfg,relative_child_path:str|None=None,):"""Randomize the scale of a rigid body asset in the USD stage. This function modifies the "xformOp:scale" property of all the prims corresponding to the asset. It takes a tuple or dictionary for the scale ranges. If it is a tuple, then the scaling along individual axis is performed equally. If it is a dictionary, the scaling is independent across each dimension. The keys of the dictionary are ``x``, ``y``, and ``z``. The values are tuples of the form ``(min, max)``. If the dictionary does not contain a key, the range is set to one for that axis. Relative child path can be used to randomize the scale of a specific child prim of the asset. For example, if the asset at prim path expression "/World/envs/env_.*/Object" has a child with the path "/World/envs/env_.*/Object/mesh", then the relative child path should be "mesh" or "/mesh". .. attention:: Since this function modifies USD properties that are parsed by the physics engine once the simulation starts, the term should only be used before the simulation starts playing. This corresponds to the event mode named "usd". Using it at simulation time, may lead to unpredictable behaviors. .. note:: When randomizing the scale of individual assets, please make sure to set :attr:`isaaclab.scene.InteractiveSceneCfg.replicate_physics` to False. This ensures that physics parser will parse the individual asset properties separately. """# check if sim is runningifenv.sim.is_playing():raiseRuntimeError("Randomizing scale while simulation is running leads to unpredictable behaviors."" Please ensure that the event term is called before the simulation starts by using the 'usd' mode.")# extract the used quantities (to enable type-hinting)asset:RigidObject=env.scene[asset_cfg.name]ifisinstance(asset,Articulation):raiseValueError("Scaling an articulation randomly is not supported, as it affects joint attributes and can cause"" unexpected behavior. To achieve different scales, we recommend generating separate USD files for"" each version of the articulation and using multi-asset spawning. For more details, refer to:"" https://isaac-sim.github.io/IsaacLab/main/source/how-to/multi_asset_spawning.html")# resolve environment idsifenv_idsisNone:env_ids=torch.arange(env.scene.num_envs,device="cpu")else:env_ids=env_ids.cpu()# acquire stagestage=omni.usd.get_context().get_stage()# resolve prim paths for spawning and cloningprim_paths=sim_utils.find_matching_prim_paths(asset.cfg.prim_path)# sample scale valuesifisinstance(scale_range,dict):range_list=[scale_range.get(key,(1.0,1.0))forkeyin["x","y","z"]]ranges=torch.tensor(range_list,device="cpu")rand_samples=math_utils.sample_uniform(ranges[:,0],ranges[:,1],(len(env_ids),3),device="cpu")else:rand_samples=math_utils.sample_uniform(*scale_range,(len(env_ids),1),device="cpu")rand_samples=rand_samples.repeat(1,3)# convert to list for the for looprand_samples=rand_samples.tolist()# apply the randomization to the parent if no relative child path is provided# this might be useful if user wants to randomize a particular mesh in the prim hierarchyifrelative_child_pathisNone:relative_child_path=""elifnotrelative_child_path.startswith("/"):relative_child_path="/"+relative_child_path# use sdf changeblock for faster processing of USD propertieswithSdf.ChangeBlock():fori,env_idinenumerate(env_ids):# path to prim to randomizeprim_path=prim_paths[env_id]+relative_child_path# spawn single instanceprim_spec=Sdf.CreatePrimInLayer(stage.GetRootLayer(),prim_path)# get the attribute to randomizescale_spec=prim_spec.GetAttributeAtPath(prim_path+".xformOp:scale")# if the scale attribute does not exist, create ithas_scale_attr=scale_specisnotNoneifnothas_scale_attr:scale_spec=Sdf.AttributeSpec(prim_spec,prim_path+".xformOp:scale",Sdf.ValueTypeNames.Double3)# set the new scalescale_spec.default=Gf.Vec3f(*rand_samples[i])# ensure the operation is done in the right ordering if we created the scale attribute.# otherwise, we assume the scale attribute is already in the right order.# note: by default isaac sim follows this ordering for the transform stack so any asset# created through it will have the correct orderingifnothas_scale_attr:op_order_spec=prim_spec.GetAttributeAtPath(prim_path+".xformOpOrder")ifop_order_specisNone:op_order_spec=Sdf.AttributeSpec(prim_spec,UsdGeom.Tokens.xformOpOrder,Sdf.ValueTypeNames.TokenArray)op_order_spec.default=Vt.TokenArray(["xformOp:translate","xformOp:orient","xformOp:scale"])
[docs]classrandomize_rigid_body_material(ManagerTermBase):"""Randomize the physics materials on all geometries of the asset. This function creates a set of physics materials with random static friction, dynamic friction, and restitution values. The number of materials is specified by ``num_buckets``. The materials are generated by sampling uniform random values from the given ranges. The material properties are then assigned to the geometries of the asset. The assignment is done by creating a random integer tensor of shape (num_instances, max_num_shapes) where ``num_instances`` is the number of assets spawned and ``max_num_shapes`` is the maximum number of shapes in the asset (over all bodies). The integer values are used as indices to select the material properties from the material buckets. If the flag ``make_consistent`` is set to ``True``, the dynamic friction is set to be less than or equal to the static friction. This obeys the physics constraint on friction values. However, it may not always be essential for the application. Thus, the flag is set to ``False`` by default. .. attention:: This function uses CPU tensors to assign the material properties. It is recommended to use this function only during the initialization of the environment. Otherwise, it may lead to a significant performance overhead. .. note:: PhysX only allows 64000 unique physics materials in the scene. If the number of materials exceeds this limit, the simulation will crash. Due to this reason, we sample the materials only once during initialization. Afterwards, these materials are randomly assigned to the geometries of the asset. """
[docs]def__init__(self,cfg:EventTermCfg,env:ManagerBasedEnv):"""Initialize the term. Args: cfg: The configuration of the event term. env: The environment instance. Raises: ValueError: If the asset is not a RigidObject or an Articulation. """super().__init__(cfg,env)# extract the used quantities (to enable type-hinting)self.asset_cfg:SceneEntityCfg=cfg.params["asset_cfg"]self.asset:RigidObject|Articulation=env.scene[self.asset_cfg.name]ifnotisinstance(self.asset,(RigidObject,Articulation)):raiseValueError(f"Randomization term 'randomize_rigid_body_material' not supported for asset: '{self.asset_cfg.name}'"f" with type: '{type(self.asset)}'.")# obtain number of shapes per body (needed for indexing the material properties correctly)# note: this is a workaround since the Articulation does not provide a direct way to obtain the number of shapes# per body. We use the physics simulation view to obtain the number of shapes per body.ifisinstance(self.asset,Articulation)andself.asset_cfg.body_ids!=slice(None):self.num_shapes_per_body=[]forlink_pathinself.asset.root_physx_view.link_paths[0]:link_physx_view=self.asset._physics_sim_view.create_rigid_body_view(link_path)# type: ignoreself.num_shapes_per_body.append(link_physx_view.max_shapes)# ensure the parsing is correctnum_shapes=sum(self.num_shapes_per_body)expected_shapes=self.asset.root_physx_view.max_shapesifnum_shapes!=expected_shapes:raiseValueError("Randomization term 'randomize_rigid_body_material' failed to parse the number of shapes per body."f" Expected total shapes: {expected_shapes}, but got: {num_shapes}.")else:# in this case, we don't need to do special indexingself.num_shapes_per_body=None# obtain parameters for sampling friction and restitution valuesstatic_friction_range=cfg.params.get("static_friction_range",(1.0,1.0))dynamic_friction_range=cfg.params.get("dynamic_friction_range",(1.0,1.0))restitution_range=cfg.params.get("restitution_range",(0.0,0.0))num_buckets=int(cfg.params.get("num_buckets",1))# sample material properties from the given ranges# note: we only sample the materials once during initialization# afterwards these are randomly assigned to the geometries of the assetrange_list=[static_friction_range,dynamic_friction_range,restitution_range]ranges=torch.tensor(range_list,device="cpu")self.material_buckets=math_utils.sample_uniform(ranges[:,0],ranges[:,1],(num_buckets,3),device="cpu")# ensure dynamic friction is always less than static frictionmake_consistent=cfg.params.get("make_consistent",False)ifmake_consistent:self.material_buckets[:,1]=torch.min(self.material_buckets[:,0],self.material_buckets[:,1])
def__call__(self,env:ManagerBasedEnv,env_ids:torch.Tensor|None,static_friction_range:tuple[float,float],dynamic_friction_range:tuple[float,float],restitution_range:tuple[float,float],num_buckets:int,asset_cfg:SceneEntityCfg,make_consistent:bool=False,):# resolve environment idsifenv_idsisNone:env_ids=torch.arange(env.scene.num_envs,device="cpu")else:env_ids=env_ids.cpu()# randomly assign material IDs to the geometriestotal_num_shapes=self.asset.root_physx_view.max_shapesbucket_ids=torch.randint(0,num_buckets,(len(env_ids),total_num_shapes),device="cpu")material_samples=self.material_buckets[bucket_ids]# retrieve material buffer from the physics simulationmaterials=self.asset.root_physx_view.get_material_properties()# update material buffer with new samplesifself.num_shapes_per_bodyisnotNone:# sample material properties from the given rangesforbody_idinself.asset_cfg.body_ids:# obtain indices of shapes for the bodystart_idx=sum(self.num_shapes_per_body[:body_id])end_idx=start_idx+self.num_shapes_per_body[body_id]# assign the new materials# material samples are of shape: num_env_ids x total_num_shapes x 3materials[env_ids,start_idx:end_idx]=material_samples[:,start_idx:end_idx]else:# assign all the materialsmaterials[env_ids]=material_samples[:]# apply to simulationself.asset.root_physx_view.set_material_properties(materials,env_ids)
[docs]defrandomize_rigid_body_mass(env:ManagerBasedEnv,env_ids:torch.Tensor|None,asset_cfg:SceneEntityCfg,mass_distribution_params:tuple[float,float],operation:Literal["add","scale","abs"],distribution:Literal["uniform","log_uniform","gaussian"]="uniform",recompute_inertia:bool=True,):"""Randomize the mass of the bodies by adding, scaling, or setting random values. This function allows randomizing the mass of the bodies of the asset. The function samples random values from the given distribution parameters and adds, scales, or sets the values into the physics simulation based on the operation. If the ``recompute_inertia`` flag is set to ``True``, the function recomputes the inertia tensor of the bodies after setting the mass. This is useful when the mass is changed significantly, as the inertia tensor depends on the mass. It assumes the body is a uniform density object. If the body is not a uniform density object, the inertia tensor may not be accurate. .. tip:: This function uses CPU tensors to assign the body masses. It is recommended to use this function only during the initialization of the environment. """# extract the used quantities (to enable type-hinting)asset:RigidObject|Articulation=env.scene[asset_cfg.name]# resolve environment idsifenv_idsisNone:env_ids=torch.arange(env.scene.num_envs,device="cpu")else:env_ids=env_ids.cpu()# resolve body indicesifasset_cfg.body_ids==slice(None):body_ids=torch.arange(asset.num_bodies,dtype=torch.int,device="cpu")else:body_ids=torch.tensor(asset_cfg.body_ids,dtype=torch.int,device="cpu")# get the current masses of the bodies (num_assets, num_bodies)masses=asset.root_physx_view.get_masses()# apply randomization on default values# this is to make sure when calling the function multiple times, the randomization is applied on the# default values and not the previously randomized valuesmasses[env_ids[:,None],body_ids]=asset.data.default_mass[env_ids[:,None],body_ids].clone()# sample from the given range# note: we modify the masses in-place for all environments# however, the setter takes care that only the masses of the specified environments are modifiedmasses=_randomize_prop_by_op(masses,mass_distribution_params,env_ids,body_ids,operation=operation,distribution=distribution)# set the mass into the physics simulationasset.root_physx_view.set_masses(masses,env_ids)# recompute inertia tensors if neededifrecompute_inertia:# compute the ratios of the new masses to the initial massesratios=masses[env_ids[:,None],body_ids]/asset.data.default_mass[env_ids[:,None],body_ids]# scale the inertia tensors by the the ratios# since mass randomization is done on default values, we can use the default inertia tensorsinertias=asset.root_physx_view.get_inertias()ifisinstance(asset,Articulation):# inertia has shape: (num_envs, num_bodies, 9) for articulationinertias[env_ids[:,None],body_ids]=(asset.data.default_inertia[env_ids[:,None],body_ids]*ratios[...,None])else:# inertia has shape: (num_envs, 9) for rigid objectinertias[env_ids]=asset.data.default_inertia[env_ids]*ratios# set the inertia tensors into the physics simulationasset.root_physx_view.set_inertias(inertias,env_ids)
[docs]defrandomize_rigid_body_collider_offsets(env:ManagerBasedEnv,env_ids:torch.Tensor|None,asset_cfg:SceneEntityCfg,rest_offset_distribution_params:tuple[float,float]|None=None,contact_offset_distribution_params:tuple[float,float]|None=None,distribution:Literal["uniform","log_uniform","gaussian"]="uniform",):"""Randomize the collider parameters of rigid bodies in an asset by adding, scaling, or setting random values. This function allows randomizing the collider parameters of the asset, such as rest and contact offsets. These correspond to the physics engine collider properties that affect the collision checking. The function samples random values from the given distribution parameters and applies the operation to the collider properties. It then sets the values into the physics simulation. If the distribution parameters are not provided for a particular property, the function does not modify the property. Currently, the distribution parameters are applied as absolute values. .. tip:: This function uses CPU tensors to assign the collision properties. It is recommended to use this function only during the initialization of the environment. """# extract the used quantities (to enable type-hinting)asset:RigidObject|Articulation=env.scene[asset_cfg.name]# resolve environment idsifenv_idsisNone:env_ids=torch.arange(env.scene.num_envs,device="cpu")# sample collider properties from the given ranges and set into the physics simulation# -- rest offsetsifrest_offset_distribution_paramsisnotNone:rest_offset=asset.root_physx_view.get_rest_offsets().clone()rest_offset=_randomize_prop_by_op(rest_offset,rest_offset_distribution_params,None,slice(None),operation="abs",distribution=distribution,)asset.root_physx_view.set_rest_offsets(rest_offset,env_ids.cpu())# -- contact offsetsifcontact_offset_distribution_paramsisnotNone:contact_offset=asset.root_physx_view.get_contact_offsets().clone()contact_offset=_randomize_prop_by_op(contact_offset,contact_offset_distribution_params,None,slice(None),operation="abs",distribution=distribution,)asset.root_physx_view.set_contact_offsets(contact_offset,env_ids.cpu())
[docs]defrandomize_physics_scene_gravity(env:ManagerBasedEnv,env_ids:torch.Tensor|None,gravity_distribution_params:tuple[list[float],list[float]],operation:Literal["add","scale","abs"],distribution:Literal["uniform","log_uniform","gaussian"]="uniform",):"""Randomize gravity by adding, scaling, or setting random values. This function allows randomizing gravity of the physics scene. The function samples random values from the given distribution parameters and adds, scales, or sets the values into the physics simulation based on the operation. The distribution parameters are lists of two elements each, representing the lower and upper bounds of the distribution for the x, y, and z components of the gravity vector. The function samples random values for each component independently. .. attention:: This function applied the same gravity for all the environments. .. tip:: This function uses CPU tensors to assign gravity. """# get the current gravitygravity=torch.tensor(env.sim.cfg.gravity,device="cpu").unsqueeze(0)dist_param_0=torch.tensor(gravity_distribution_params[0],device="cpu")dist_param_1=torch.tensor(gravity_distribution_params[1],device="cpu")gravity=_randomize_prop_by_op(gravity,(dist_param_0,dist_param_1),None,slice(None),operation=operation,distribution=distribution,)# unbatch the gravity tensor into a listgravity=gravity[0].tolist()# set the gravity into the physics simulationphysics_sim_view:physx.SimulationView=sim_utils.SimulationContext.instance().physics_sim_viewphysics_sim_view.set_gravity(carb.Float3(*gravity))
[docs]defrandomize_actuator_gains(env:ManagerBasedEnv,env_ids:torch.Tensor|None,asset_cfg:SceneEntityCfg,stiffness_distribution_params:tuple[float,float]|None=None,damping_distribution_params:tuple[float,float]|None=None,operation:Literal["add","scale","abs"]="abs",distribution:Literal["uniform","log_uniform","gaussian"]="uniform",):"""Randomize the actuator gains in an articulation by adding, scaling, or setting random values. This function allows randomizing the actuator stiffness and damping gains. The function samples random values from the given distribution parameters and applies the operation to the joint properties. It then sets the values into the actuator models. If the distribution parameters are not provided for a particular property, the function does not modify the property. .. tip:: For implicit actuators, this function uses CPU tensors to assign the actuator gains into the simulation. In such cases, it is recommended to use this function only during the initialization of the environment. """# Extract the used quantities (to enable type-hinting)asset:Articulation=env.scene[asset_cfg.name]# Resolve environment idsifenv_idsisNone:env_ids=torch.arange(env.scene.num_envs,device=asset.device)defrandomize(data:torch.Tensor,params:tuple[float,float])->torch.Tensor:return_randomize_prop_by_op(data,params,dim_0_ids=None,dim_1_ids=actuator_indices,operation=operation,distribution=distribution)# Loop through actuators and randomize gainsforactuatorinasset.actuators.values():ifisinstance(asset_cfg.joint_ids,slice):# we take all the joints of the actuatoractuator_indices=slice(None)ifisinstance(actuator.joint_indices,slice):global_indices=slice(None)else:global_indices=torch.tensor(actuator.joint_indices,device=asset.device)elifisinstance(actuator.joint_indices,slice):# we take the joints defined in the asset configglobal_indices=actuator_indices=torch.tensor(asset_cfg.joint_ids,device=asset.device)else:# we take the intersection of the actuator joints and the asset config jointsactuator_joint_indices=torch.tensor(actuator.joint_indices,device=asset.device)asset_joint_ids=torch.tensor(asset_cfg.joint_ids,device=asset.device)# the indices of the joints in the actuator that have to be randomizedactuator_indices=torch.nonzero(torch.isin(actuator_joint_indices,asset_joint_ids)).view(-1)iflen(actuator_indices)==0:continue# maps actuator indices that have to be randomized to global joint indicesglobal_indices=actuator_joint_indices[actuator_indices]# Randomize stiffnessifstiffness_distribution_paramsisnotNone:stiffness=actuator.stiffness[env_ids].clone()stiffness[:,actuator_indices]=asset.data.default_joint_stiffness[env_ids][:,global_indices].clone()randomize(stiffness,stiffness_distribution_params)actuator.stiffness[env_ids]=stiffnessifisinstance(actuator,ImplicitActuator):asset.write_joint_stiffness_to_sim(stiffness,joint_ids=actuator.joint_indices,env_ids=env_ids)# Randomize dampingifdamping_distribution_paramsisnotNone:damping=actuator.damping[env_ids].clone()damping[:,actuator_indices]=asset.data.default_joint_damping[env_ids][:,global_indices].clone()randomize(damping,damping_distribution_params)actuator.damping[env_ids]=dampingifisinstance(actuator,ImplicitActuator):asset.write_joint_damping_to_sim(damping,joint_ids=actuator.joint_indices,env_ids=env_ids)
[docs]defrandomize_joint_parameters(env:ManagerBasedEnv,env_ids:torch.Tensor|None,asset_cfg:SceneEntityCfg,friction_distribution_params:tuple[float,float]|None=None,armature_distribution_params:tuple[float,float]|None=None,lower_limit_distribution_params:tuple[float,float]|None=None,upper_limit_distribution_params:tuple[float,float]|None=None,operation:Literal["add","scale","abs"]="abs",distribution:Literal["uniform","log_uniform","gaussian"]="uniform",):"""Randomize the simulated joint parameters of an articulation by adding, scaling, or setting random values. This function allows randomizing the joint parameters of the asset. These correspond to the physics engine joint properties that affect the joint behavior. The properties include the joint friction coefficient, armature, and joint position limits. The function samples random values from the given distribution parameters and applies the operation to the joint properties. It then sets the values into the physics simulation. If the distribution parameters are not provided for a particular property, the function does not modify the property. .. tip:: This function uses CPU tensors to assign the joint properties. It is recommended to use this function only during the initialization of the environment. """# extract the used quantities (to enable type-hinting)asset:Articulation=env.scene[asset_cfg.name]# resolve environment idsifenv_idsisNone:env_ids=torch.arange(env.scene.num_envs,device=asset.device)# resolve joint indicesifasset_cfg.joint_ids==slice(None):joint_ids=slice(None)# for optimization purposeselse:joint_ids=torch.tensor(asset_cfg.joint_ids,dtype=torch.int,device=asset.device)# sample joint properties from the given ranges and set into the physics simulation# joint friction coefficientiffriction_distribution_paramsisnotNone:friction_coeff=_randomize_prop_by_op(asset.data.default_joint_friction_coeff.clone(),friction_distribution_params,env_ids,joint_ids,operation=operation,distribution=distribution,)asset.write_joint_friction_coefficient_to_sim(friction_coeff[env_ids[:,None],joint_ids],joint_ids=joint_ids,env_ids=env_ids)# joint armatureifarmature_distribution_paramsisnotNone:armature=_randomize_prop_by_op(asset.data.default_joint_armature.clone(),armature_distribution_params,env_ids,joint_ids,operation=operation,distribution=distribution,)asset.write_joint_armature_to_sim(armature[env_ids[:,None],joint_ids],joint_ids=joint_ids,env_ids=env_ids)# joint position limitsiflower_limit_distribution_paramsisnotNoneorupper_limit_distribution_paramsisnotNone:joint_pos_limits=asset.data.default_joint_pos_limits.clone()# -- randomize the lower limitsiflower_limit_distribution_paramsisnotNone:joint_pos_limits[...,0]=_randomize_prop_by_op(joint_pos_limits[...,0],lower_limit_distribution_params,env_ids,joint_ids,operation=operation,distribution=distribution,)# -- randomize the upper limitsifupper_limit_distribution_paramsisnotNone:joint_pos_limits[...,1]=_randomize_prop_by_op(joint_pos_limits[...,1],upper_limit_distribution_params,env_ids,joint_ids,operation=operation,distribution=distribution,)# extract the position limits for the concerned jointsjoint_pos_limits=joint_pos_limits[env_ids[:,None],joint_ids]if(joint_pos_limits[...,0]>joint_pos_limits[...,1]).any():raiseValueError("Randomization term 'randomize_joint_parameters' is setting lower joint limits that are greater than"" upper joint limits. Please check the distribution parameters for the joint position limits.")# set the position limits into the physics simulationasset.write_joint_position_limit_to_sim(joint_pos_limits,joint_ids=joint_ids,env_ids=env_ids,warn_limit_violation=False)
[docs]defrandomize_fixed_tendon_parameters(env:ManagerBasedEnv,env_ids:torch.Tensor|None,asset_cfg:SceneEntityCfg,stiffness_distribution_params:tuple[float,float]|None=None,damping_distribution_params:tuple[float,float]|None=None,limit_stiffness_distribution_params:tuple[float,float]|None=None,lower_limit_distribution_params:tuple[float,float]|None=None,upper_limit_distribution_params:tuple[float,float]|None=None,rest_length_distribution_params:tuple[float,float]|None=None,offset_distribution_params:tuple[float,float]|None=None,operation:Literal["add","scale","abs"]="abs",distribution:Literal["uniform","log_uniform","gaussian"]="uniform",):"""Randomize the simulated fixed tendon parameters of an articulation by adding, scaling, or setting random values. This function allows randomizing the fixed tendon parameters of the asset. These correspond to the physics engine tendon properties that affect the joint behavior. The function samples random values from the given distribution parameters and applies the operation to the tendon properties. It then sets the values into the physics simulation. If the distribution parameters are not provided for a particular property, the function does not modify the property. """# extract the used quantities (to enable type-hinting)asset:Articulation=env.scene[asset_cfg.name]# resolve environment idsifenv_idsisNone:env_ids=torch.arange(env.scene.num_envs,device=asset.device)# resolve joint indicesifasset_cfg.fixed_tendon_ids==slice(None):tendon_ids=slice(None)# for optimization purposeselse:tendon_ids=torch.tensor(asset_cfg.fixed_tendon_ids,dtype=torch.int,device=asset.device)# sample tendon properties from the given ranges and set into the physics simulation# stiffnessifstiffness_distribution_paramsisnotNone:stiffness=_randomize_prop_by_op(asset.data.default_fixed_tendon_stiffness.clone(),stiffness_distribution_params,env_ids,tendon_ids,operation=operation,distribution=distribution,)asset.set_fixed_tendon_stiffness(stiffness[env_ids[:,None],tendon_ids],tendon_ids,env_ids)# dampingifdamping_distribution_paramsisnotNone:damping=_randomize_prop_by_op(asset.data.default_fixed_tendon_damping.clone(),damping_distribution_params,env_ids,tendon_ids,operation=operation,distribution=distribution,)asset.set_fixed_tendon_damping(damping[env_ids[:,None],tendon_ids],tendon_ids,env_ids)# limit stiffnessiflimit_stiffness_distribution_paramsisnotNone:limit_stiffness=_randomize_prop_by_op(asset.data.default_fixed_tendon_limit_stiffness.clone(),limit_stiffness_distribution_params,env_ids,tendon_ids,operation=operation,distribution=distribution,)asset.set_fixed_tendon_limit_stiffness(limit_stiffness[env_ids[:,None],tendon_ids],tendon_ids,env_ids)# position limitsiflower_limit_distribution_paramsisnotNoneorupper_limit_distribution_paramsisnotNone:limit=asset.data.default_fixed_tendon_pos_limits.clone()# -- lower limitiflower_limit_distribution_paramsisnotNone:limit[...,0]=_randomize_prop_by_op(limit[...,0],lower_limit_distribution_params,env_ids,tendon_ids,operation=operation,distribution=distribution,)# -- upper limitifupper_limit_distribution_paramsisnotNone:limit[...,1]=_randomize_prop_by_op(limit[...,1],upper_limit_distribution_params,env_ids,tendon_ids,operation=operation,distribution=distribution,)# check if the limits are validtendon_limits=limit[env_ids[:,None],tendon_ids]if(tendon_limits[...,0]>tendon_limits[...,1]).any():raiseValueError("Randomization term 'randomize_fixed_tendon_parameters' is setting lower tendon limits that are greater"" than upper tendon limits.")asset.set_fixed_tendon_position_limit(tendon_limits,tendon_ids,env_ids)# rest lengthifrest_length_distribution_paramsisnotNone:rest_length=_randomize_prop_by_op(asset.data.default_fixed_tendon_rest_length.clone(),rest_length_distribution_params,env_ids,tendon_ids,operation=operation,distribution=distribution,)asset.set_fixed_tendon_rest_length(rest_length[env_ids[:,None],tendon_ids],tendon_ids,env_ids)# offsetifoffset_distribution_paramsisnotNone:offset=_randomize_prop_by_op(asset.data.default_fixed_tendon_offset.clone(),offset_distribution_params,env_ids,tendon_ids,operation=operation,distribution=distribution,)asset.set_fixed_tendon_offset(offset[env_ids[:,None],tendon_ids],tendon_ids,env_ids)# write the fixed tendon properties into the simulationasset.write_fixed_tendon_properties_to_sim(tendon_ids,env_ids)
[docs]defapply_external_force_torque(env:ManagerBasedEnv,env_ids:torch.Tensor,force_range:tuple[float,float],torque_range:tuple[float,float],asset_cfg:SceneEntityCfg=SceneEntityCfg("robot"),):"""Randomize the external forces and torques applied to the bodies. This function creates a set of random forces and torques sampled from the given ranges. The number of forces and torques is equal to the number of bodies times the number of environments. The forces and torques are applied to the bodies by calling ``asset.set_external_force_and_torque``. The forces and torques are only applied when ``asset.write_data_to_sim()`` is called in the environment. """# extract the used quantities (to enable type-hinting)asset:RigidObject|Articulation=env.scene[asset_cfg.name]# resolve environment idsifenv_idsisNone:env_ids=torch.arange(env.scene.num_envs,device=asset.device)# resolve number of bodiesnum_bodies=len(asset_cfg.body_ids)ifisinstance(asset_cfg.body_ids,list)elseasset.num_bodies# sample random forces and torquessize=(len(env_ids),num_bodies,3)forces=math_utils.sample_uniform(*force_range,size,asset.device)torques=math_utils.sample_uniform(*torque_range,size,asset.device)# set the forces and torques into the buffers# note: these are only applied when you call: `asset.write_data_to_sim()`asset.set_external_force_and_torque(forces,torques,env_ids=env_ids,body_ids=asset_cfg.body_ids)
[docs]defpush_by_setting_velocity(env:ManagerBasedEnv,env_ids:torch.Tensor,velocity_range:dict[str,tuple[float,float]],asset_cfg:SceneEntityCfg=SceneEntityCfg("robot"),):"""Push the asset by setting the root velocity to a random value within the given ranges. This creates an effect similar to pushing the asset with a random impulse that changes the asset's velocity. It samples the root velocity from the given ranges and sets the velocity into the physics simulation. The function takes a dictionary of velocity ranges for each axis and rotation. The keys of the dictionary are ``x``, ``y``, ``z``, ``roll``, ``pitch``, and ``yaw``. The values are tuples of the form ``(min, max)``. If the dictionary does not contain a key, the velocity is set to zero for that axis. """# extract the used quantities (to enable type-hinting)asset:RigidObject|Articulation=env.scene[asset_cfg.name]# velocitiesvel_w=asset.data.root_vel_w[env_ids]# sample random velocitiesrange_list=[velocity_range.get(key,(0.0,0.0))forkeyin["x","y","z","roll","pitch","yaw"]]ranges=torch.tensor(range_list,device=asset.device)vel_w+=math_utils.sample_uniform(ranges[:,0],ranges[:,1],vel_w.shape,device=asset.device)# set the velocities into the physics simulationasset.write_root_velocity_to_sim(vel_w,env_ids=env_ids)
[docs]defreset_root_state_uniform(env:ManagerBasedEnv,env_ids:torch.Tensor,pose_range:dict[str,tuple[float,float]],velocity_range:dict[str,tuple[float,float]],asset_cfg:SceneEntityCfg=SceneEntityCfg("robot"),):"""Reset the asset root state to a random position and velocity uniformly within the given ranges. This function randomizes the root position and velocity of the asset. * It samples the root position from the given ranges and adds them to the default root position, before setting them into the physics simulation. * It samples the root orientation from the given ranges and sets them into the physics simulation. * It samples the root velocity from the given ranges and sets them into the physics simulation. The function takes a dictionary of pose and velocity ranges for each axis and rotation. The keys of the dictionary are ``x``, ``y``, ``z``, ``roll``, ``pitch``, and ``yaw``. The values are tuples of the form ``(min, max)``. If the dictionary does not contain a key, the position or velocity is set to zero for that axis. """# extract the used quantities (to enable type-hinting)asset:RigidObject|Articulation=env.scene[asset_cfg.name]# get default root stateroot_states=asset.data.default_root_state[env_ids].clone()# posesrange_list=[pose_range.get(key,(0.0,0.0))forkeyin["x","y","z","roll","pitch","yaw"]]ranges=torch.tensor(range_list,device=asset.device)rand_samples=math_utils.sample_uniform(ranges[:,0],ranges[:,1],(len(env_ids),6),device=asset.device)positions=root_states[:,0:3]+env.scene.env_origins[env_ids]+rand_samples[:,0:3]orientations_delta=math_utils.quat_from_euler_xyz(rand_samples[:,3],rand_samples[:,4],rand_samples[:,5])orientations=math_utils.quat_mul(root_states[:,3:7],orientations_delta)# velocitiesrange_list=[velocity_range.get(key,(0.0,0.0))forkeyin["x","y","z","roll","pitch","yaw"]]ranges=torch.tensor(range_list,device=asset.device)rand_samples=math_utils.sample_uniform(ranges[:,0],ranges[:,1],(len(env_ids),6),device=asset.device)velocities=root_states[:,7:13]+rand_samples# set into the physics simulationasset.write_root_pose_to_sim(torch.cat([positions,orientations],dim=-1),env_ids=env_ids)asset.write_root_velocity_to_sim(velocities,env_ids=env_ids)
[docs]defreset_root_state_with_random_orientation(env:ManagerBasedEnv,env_ids:torch.Tensor,pose_range:dict[str,tuple[float,float]],velocity_range:dict[str,tuple[float,float]],asset_cfg:SceneEntityCfg=SceneEntityCfg("robot"),):"""Reset the asset root position and velocities sampled randomly within the given ranges and the asset root orientation sampled randomly from the SO(3). This function randomizes the root position and velocity of the asset. * It samples the root position from the given ranges and adds them to the default root position, before setting them into the physics simulation. * It samples the root orientation uniformly from the SO(3) and sets them into the physics simulation. * It samples the root velocity from the given ranges and sets them into the physics simulation. The function takes a dictionary of position and velocity ranges for each axis and rotation: * :attr:`pose_range` - a dictionary of position ranges for each axis. The keys of the dictionary are ``x``, ``y``, and ``z``. The orientation is sampled uniformly from the SO(3). * :attr:`velocity_range` - a dictionary of velocity ranges for each axis and rotation. The keys of the dictionary are ``x``, ``y``, ``z``, ``roll``, ``pitch``, and ``yaw``. The values are tuples of the form ``(min, max)``. If the dictionary does not contain a particular key, the position is set to zero for that axis. """# extract the used quantities (to enable type-hinting)asset:RigidObject|Articulation=env.scene[asset_cfg.name]# get default root stateroot_states=asset.data.default_root_state[env_ids].clone()# posesrange_list=[pose_range.get(key,(0.0,0.0))forkeyin["x","y","z"]]ranges=torch.tensor(range_list,device=asset.device)rand_samples=math_utils.sample_uniform(ranges[:,0],ranges[:,1],(len(env_ids),3),device=asset.device)positions=root_states[:,0:3]+env.scene.env_origins[env_ids]+rand_samplesorientations=math_utils.random_orientation(len(env_ids),device=asset.device)# velocitiesrange_list=[velocity_range.get(key,(0.0,0.0))forkeyin["x","y","z","roll","pitch","yaw"]]ranges=torch.tensor(range_list,device=asset.device)rand_samples=math_utils.sample_uniform(ranges[:,0],ranges[:,1],(len(env_ids),6),device=asset.device)velocities=root_states[:,7:13]+rand_samples# set into the physics simulationasset.write_root_pose_to_sim(torch.cat([positions,orientations],dim=-1),env_ids=env_ids)asset.write_root_velocity_to_sim(velocities,env_ids=env_ids)
[docs]defreset_root_state_from_terrain(env:ManagerBasedEnv,env_ids:torch.Tensor,pose_range:dict[str,tuple[float,float]],velocity_range:dict[str,tuple[float,float]],asset_cfg:SceneEntityCfg=SceneEntityCfg("robot"),):"""Reset the asset root state by sampling a random valid pose from the terrain. This function samples a random valid pose(based on flat patches) from the terrain and sets the root state of the asset to this position. The function also samples random velocities from the given ranges and sets them into the physics simulation. The function takes a dictionary of position and velocity ranges for each axis and rotation: * :attr:`pose_range` - a dictionary of pose ranges for each axis. The keys of the dictionary are ``roll``, ``pitch``, and ``yaw``. The position is sampled from the flat patches of the terrain. * :attr:`velocity_range` - a dictionary of velocity ranges for each axis and rotation. The keys of the dictionary are ``x``, ``y``, ``z``, ``roll``, ``pitch``, and ``yaw``. The values are tuples of the form ``(min, max)``. If the dictionary does not contain a particular key, the position is set to zero for that axis. Note: The function expects the terrain to have valid flat patches under the key "init_pos". The flat patches are used to sample the random pose for the robot. Raises: ValueError: If the terrain does not have valid flat patches under the key "init_pos". """# access the used quantities (to enable type-hinting)asset:RigidObject|Articulation=env.scene[asset_cfg.name]terrain:TerrainImporter=env.scene.terrain# obtain all flat patches corresponding to the valid posesvalid_positions:torch.Tensor=terrain.flat_patches.get("init_pos")ifvalid_positionsisNone:raiseValueError("The event term 'reset_root_state_from_terrain' requires valid flat patches under 'init_pos'."f" Found: {list(terrain.flat_patches.keys())}")# sample random valid posesids=torch.randint(0,valid_positions.shape[2],size=(len(env_ids),),device=env.device)positions=valid_positions[terrain.terrain_levels[env_ids],terrain.terrain_types[env_ids],ids]positions+=asset.data.default_root_state[env_ids,:3]# sample random orientationsrange_list=[pose_range.get(key,(0.0,0.0))forkeyin["roll","pitch","yaw"]]ranges=torch.tensor(range_list,device=asset.device)rand_samples=math_utils.sample_uniform(ranges[:,0],ranges[:,1],(len(env_ids),3),device=asset.device)# convert to quaternionsorientations=math_utils.quat_from_euler_xyz(rand_samples[:,0],rand_samples[:,1],rand_samples[:,2])# sample random velocitiesrange_list=[velocity_range.get(key,(0.0,0.0))forkeyin["x","y","z","roll","pitch","yaw"]]ranges=torch.tensor(range_list,device=asset.device)rand_samples=math_utils.sample_uniform(ranges[:,0],ranges[:,1],(len(env_ids),6),device=asset.device)velocities=asset.data.default_root_state[env_ids,7:13]+rand_samples# set into the physics simulationasset.write_root_pose_to_sim(torch.cat([positions,orientations],dim=-1),env_ids=env_ids)asset.write_root_velocity_to_sim(velocities,env_ids=env_ids)
[docs]defreset_joints_by_scale(env:ManagerBasedEnv,env_ids:torch.Tensor,position_range:tuple[float,float],velocity_range:tuple[float,float],asset_cfg:SceneEntityCfg=SceneEntityCfg("robot"),):"""Reset the robot joints by scaling the default position and velocity by the given ranges. This function samples random values from the given ranges and scales the default joint positions and velocities by these values. The scaled values are then set into the physics simulation. """# extract the used quantities (to enable type-hinting)asset:Articulation=env.scene[asset_cfg.name]# get default joint statejoint_pos=asset.data.default_joint_pos[env_ids].clone()joint_vel=asset.data.default_joint_vel[env_ids].clone()# scale these values randomlyjoint_pos*=math_utils.sample_uniform(*position_range,joint_pos.shape,joint_pos.device)joint_vel*=math_utils.sample_uniform(*velocity_range,joint_vel.shape,joint_vel.device)# clamp joint pos to limitsjoint_pos_limits=asset.data.soft_joint_pos_limits[env_ids]joint_pos=joint_pos.clamp_(joint_pos_limits[...,0],joint_pos_limits[...,1])# clamp joint vel to limitsjoint_vel_limits=asset.data.soft_joint_vel_limits[env_ids]joint_vel=joint_vel.clamp_(-joint_vel_limits,joint_vel_limits)# set into the physics simulationasset.write_joint_state_to_sim(joint_pos,joint_vel,env_ids=env_ids)
[docs]defreset_joints_by_offset(env:ManagerBasedEnv,env_ids:torch.Tensor,position_range:tuple[float,float],velocity_range:tuple[float,float],asset_cfg:SceneEntityCfg=SceneEntityCfg("robot"),):"""Reset the robot joints with offsets around the default position and velocity by the given ranges. This function samples random values from the given ranges and biases the default joint positions and velocities by these values. The biased values are then set into the physics simulation. """# extract the used quantities (to enable type-hinting)asset:Articulation=env.scene[asset_cfg.name]# get default joint statejoint_pos=asset.data.default_joint_pos[env_ids].clone()joint_vel=asset.data.default_joint_vel[env_ids].clone()# bias these values randomlyjoint_pos+=math_utils.sample_uniform(*position_range,joint_pos.shape,joint_pos.device)joint_vel+=math_utils.sample_uniform(*velocity_range,joint_vel.shape,joint_vel.device)# clamp joint pos to limitsjoint_pos_limits=asset.data.soft_joint_pos_limits[env_ids]joint_pos=joint_pos.clamp_(joint_pos_limits[...,0],joint_pos_limits[...,1])# clamp joint vel to limitsjoint_vel_limits=asset.data.soft_joint_vel_limits[env_ids]joint_vel=joint_vel.clamp_(-joint_vel_limits,joint_vel_limits)# set into the physics simulationasset.write_joint_state_to_sim(joint_pos,joint_vel,env_ids=env_ids)
[docs]defreset_nodal_state_uniform(env:ManagerBasedEnv,env_ids:torch.Tensor,position_range:dict[str,tuple[float,float]],velocity_range:dict[str,tuple[float,float]],asset_cfg:SceneEntityCfg=SceneEntityCfg("robot"),):"""Reset the asset nodal state to a random position and velocity uniformly within the given ranges. This function randomizes the nodal position and velocity of the asset. * It samples the root position from the given ranges and adds them to the default nodal position, before setting them into the physics simulation. * It samples the root velocity from the given ranges and sets them into the physics simulation. The function takes a dictionary of position and velocity ranges for each axis. The keys of the dictionary are ``x``, ``y``, ``z``. The values are tuples of the form ``(min, max)``. If the dictionary does not contain a key, the position or velocity is set to zero for that axis. """# extract the used quantities (to enable type-hinting)asset:DeformableObject=env.scene[asset_cfg.name]# get default root statenodal_state=asset.data.default_nodal_state_w[env_ids].clone()# positionrange_list=[position_range.get(key,(0.0,0.0))forkeyin["x","y","z"]]ranges=torch.tensor(range_list,device=asset.device)rand_samples=math_utils.sample_uniform(ranges[:,0],ranges[:,1],(len(env_ids),1,3),device=asset.device)nodal_state[...,:3]+=rand_samples# velocitiesrange_list=[velocity_range.get(key,(0.0,0.0))forkeyin["x","y","z"]]ranges=torch.tensor(range_list,device=asset.device)rand_samples=math_utils.sample_uniform(ranges[:,0],ranges[:,1],(len(env_ids),1,3),device=asset.device)nodal_state[...,3:]+=rand_samples# set into the physics simulationasset.write_nodal_state_to_sim(nodal_state,env_ids=env_ids)
[docs]defreset_scene_to_default(env:ManagerBasedEnv,env_ids:torch.Tensor):"""Reset the scene to the default state specified in the scene configuration."""# rigid bodiesforrigid_objectinenv.scene.rigid_objects.values():# obtain default and deal with the offset for env originsdefault_root_state=rigid_object.data.default_root_state[env_ids].clone()default_root_state[:,0:3]+=env.scene.env_origins[env_ids]# set into the physics simulationrigid_object.write_root_pose_to_sim(default_root_state[:,:7],env_ids=env_ids)rigid_object.write_root_velocity_to_sim(default_root_state[:,7:],env_ids=env_ids)# articulationsforarticulation_assetinenv.scene.articulations.values():# obtain default and deal with the offset for env originsdefault_root_state=articulation_asset.data.default_root_state[env_ids].clone()default_root_state[:,0:3]+=env.scene.env_origins[env_ids]# set into the physics simulationarticulation_asset.write_root_pose_to_sim(default_root_state[:,:7],env_ids=env_ids)articulation_asset.write_root_velocity_to_sim(default_root_state[:,7:],env_ids=env_ids)# obtain default joint positionsdefault_joint_pos=articulation_asset.data.default_joint_pos[env_ids].clone()default_joint_vel=articulation_asset.data.default_joint_vel[env_ids].clone()# set into the physics simulationarticulation_asset.write_joint_state_to_sim(default_joint_pos,default_joint_vel,env_ids=env_ids)# deformable objectsfordeformable_objectinenv.scene.deformable_objects.values():# obtain default and set into the physics simulationnodal_state=deformable_object.data.default_nodal_state_w[env_ids].clone()deformable_object.write_nodal_state_to_sim(nodal_state,env_ids=env_ids)
[docs]classrandomize_visual_texture_material(ManagerTermBase):"""Randomize the visual texture of bodies on an asset using Replicator API. This function randomizes the visual texture of the bodies of the asset using the Replicator API. The function samples random textures from the given texture paths and applies them to the bodies of the asset. The textures are projected onto the bodies and rotated by the given angles. .. note:: The function assumes that the asset follows the prim naming convention as: "{asset_prim_path}/{body_name}/visuals" where the body name is the name of the body to which the texture is applied. This is the default prim ordering when importing assets from the asset converters in Isaac Lab. .. note:: When randomizing the texture of individual assets, please make sure to set :attr:`isaaclab.scene.InteractiveSceneCfg.replicate_physics` to False. This ensures that physics parser will parse the individual asset properties separately. """
[docs]def__init__(self,cfg:EventTermCfg,env:ManagerBasedEnv):"""Initialize the term. Args: cfg: The configuration of the event term. env: The environment instance. """super().__init__(cfg,env)# enable replicator extension if not already enabledenable_extension("omni.replicator.core")# we import the module here since we may not always need the replicatorimportomni.replicator.coreasrep# read parameters from the configurationasset_cfg:SceneEntityCfg=cfg.params.get("asset_cfg")texture_paths=cfg.params.get("texture_paths")event_name=cfg.params.get("event_name")texture_rotation=cfg.params.get("texture_rotation",(0.0,0.0))# check to make sure replicate_physics is set to False, else raise error# note: We add an explicit check here since texture randomization can happen outside of 'prestartup' mode# and the event manager doesn't check in that case.ifenv.cfg.scene.replicate_physics:raiseRuntimeError("Unable to randomize visual texture material with scene replication enabled."" For stable USD-level randomization, please disable scene replication"" by setting 'replicate_physics' to False in 'InteractiveSceneCfg'.")# convert from radians to degreestexture_rotation=tuple(math.degrees(angle)forangleintexture_rotation)# obtain the asset entityasset=env.scene[asset_cfg.name]# join all bodies in the assetbody_names=asset_cfg.body_namesifisinstance(body_names,str):body_names_regex=body_nameselifisinstance(body_names,list):body_names_regex="|".join(body_names)else:body_names_regex=".*"# create the affected prim path# TODO: Remove the hard-coded "/visuals" part.prim_path=f"{asset.cfg.prim_path}/{body_names_regex}/visuals"# Create the omni-graph node for the randomization termdefrep_texture_randomization():prims_group=rep.get.prims(path_pattern=prim_path)withprims_group:rep.randomizer.texture(textures=texture_paths,project_uvw=True,texture_rotate=rep.distribution.uniform(*texture_rotation))returnprims_group.node# Register the event to the replicatorwithrep.trigger.on_custom_event(event_name=event_name):rep_texture_randomization()
def__call__(self,env:ManagerBasedEnv,env_ids:torch.Tensor,event_name:str,asset_cfg:SceneEntityCfg,texture_paths:list[str],texture_rotation:tuple[float,float]=(0.0,0.0),):# import replicatorimportomni.replicator.coreasrep# only send the event to the replicator# note: This triggers the nodes for all the environments.# We need to investigate how to make it happen only for a subset based on env_ids.rep.utils.send_og_event(event_name)
[docs]classrandomize_visual_color(ManagerTermBase):"""Randomize the visual color of bodies on an asset using Replicator API. This function randomizes the visual color of the bodies of the asset using the Replicator API. The function samples random colors from the given colors and applies them to the bodies of the asset. The function assumes that the asset follows the prim naming convention as: "{asset_prim_path}/{mesh_name}" where the mesh name is the name of the mesh to which the color is applied. For instance, if the asset has a prim path "/World/asset" and a mesh named "body_0/mesh", the prim path for the mesh would be "/World/asset/body_0/mesh". The colors can be specified as a list of tuples of the form ``(r, g, b)`` or as a dictionary with the keys ``r``, ``g``, ``b`` and values as tuples of the form ``(low, high)``. If a dictionary is used, the function will sample random colors from the given ranges. .. note:: When randomizing the color of individual assets, please make sure to set :attr:`isaaclab.scene.InteractiveSceneCfg.replicate_physics` to False. This ensures that physics parser will parse the individual asset properties separately. """
[docs]def__init__(self,cfg:EventTermCfg,env:ManagerBasedEnv):"""Initialize the randomization term."""super().__init__(cfg,env)# enable replicator extension if not already enabledenable_extension("omni.replicator.core")# we import the module here since we may not always need the replicatorimportomni.replicator.coreasrep# read parameters from the configurationasset_cfg:SceneEntityCfg=cfg.params.get("asset_cfg")colors=cfg.params.get("colors")event_name=cfg.params.get("event_name")mesh_name:str=cfg.params.get("mesh_name","")# type: ignore# check to make sure replicate_physics is set to False, else raise error# note: We add an explicit check here since texture randomization can happen outside of 'prestartup' mode# and the event manager doesn't check in that case.ifenv.cfg.scene.replicate_physics:raiseRuntimeError("Unable to randomize visual color with scene replication enabled."" For stable USD-level randomization, please disable scene replication"" by setting 'replicate_physics' to False in 'InteractiveSceneCfg'.")# obtain the asset entityasset=env.scene[asset_cfg.name]# create the affected prim pathifnotmesh_name.startswith("/"):mesh_name="/"+mesh_namemesh_prim_path=f"{asset.cfg.prim_path}{mesh_name}"# TODO: Need to make it work for multiple meshes.# parse the colors into replicator formatifisinstance(colors,dict):# (r, g, b) - low, high --> (low_r, low_g, low_b) and (high_r, high_g, high_b)color_low=[colors[key][0]forkeyin["r","g","b"]]color_high=[colors[key][1]forkeyin["r","g","b"]]colors=rep.distribution.uniform(color_low,color_high)else:colors=list(colors)# Create the omni-graph node for the randomization termdefrep_texture_randomization():prims_group=rep.get.prims(path_pattern=mesh_prim_path)withprims_group:rep.randomizer.color(colors=colors)returnprims_group.node# Register the event to the replicatorwithrep.trigger.on_custom_event(event_name=event_name):rep_texture_randomization()
def__call__(self,env:ManagerBasedEnv,env_ids:torch.Tensor,event_name:str,asset_cfg:SceneEntityCfg,colors:list[tuple[float,float,float]]|dict[str,tuple[float,float]],mesh_name:str="",):# import replicatorimportomni.replicator.coreasrep# only send the event to the replicatorrep.utils.send_og_event(event_name)
"""Internal helper functions."""def_randomize_prop_by_op(data:torch.Tensor,distribution_parameters:tuple[float|torch.Tensor,float|torch.Tensor],dim_0_ids:torch.Tensor|None,dim_1_ids:torch.Tensor|slice,operation:Literal["add","scale","abs"],distribution:Literal["uniform","log_uniform","gaussian"],)->torch.Tensor:"""Perform data randomization based on the given operation and distribution. Args: data: The data tensor to be randomized. Shape is (dim_0, dim_1). distribution_parameters: The parameters for the distribution to sample values from. dim_0_ids: The indices of the first dimension to randomize. dim_1_ids: The indices of the second dimension to randomize. operation: The operation to perform on the data. Options: 'add', 'scale', 'abs'. distribution: The distribution to sample the random values from. Options: 'uniform', 'log_uniform'. Returns: The data tensor after randomization. Shape is (dim_0, dim_1). Raises: NotImplementedError: If the operation or distribution is not supported. """# resolve shape# -- dim 0ifdim_0_idsisNone:n_dim_0=data.shape[0]dim_0_ids=slice(None)else:n_dim_0=len(dim_0_ids)ifnotisinstance(dim_1_ids,slice):dim_0_ids=dim_0_ids[:,None]# -- dim 1ifisinstance(dim_1_ids,slice):n_dim_1=data.shape[1]else:n_dim_1=len(dim_1_ids)# resolve the distributionifdistribution=="uniform":dist_fn=math_utils.sample_uniformelifdistribution=="log_uniform":dist_fn=math_utils.sample_log_uniformelifdistribution=="gaussian":dist_fn=math_utils.sample_gaussianelse:raiseNotImplementedError(f"Unknown distribution: '{distribution}' for joint properties randomization."" Please use 'uniform', 'log_uniform', 'gaussian'.")# perform the operationifoperation=="add":data[dim_0_ids,dim_1_ids]+=dist_fn(*distribution_parameters,(n_dim_0,n_dim_1),device=data.device)elifoperation=="scale":data[dim_0_ids,dim_1_ids]*=dist_fn(*distribution_parameters,(n_dim_0,n_dim_1),device=data.device)elifoperation=="abs":data[dim_0_ids,dim_1_ids]=dist_fn(*distribution_parameters,(n_dim_0,n_dim_1),device=data.device)else:raiseNotImplementedError(f"Unknown operation: '{operation}' for property randomization. Please use 'add', 'scale', or 'abs'.")returndata