isaaclab_mimic.datagen#
Sub-package with core implementation logic for Isaac Lab Mimic.
Classes
The main data generator object that loads a source dataset, parses it, and generates new trajectories. |
|
Defines the structure of information required from an environment for data generation processes. |
|
Pool of DatagenInfo for data generation. |
|
|
Defines methods and functions for selection strategies to implement. |
Pick source demonstration randomly. |
|
Pick source demonstration to be the one with the closest object pose to the object in the current scene. |
|
Pick source demonstration to be the one that minimizes the distance the robot end effector will need to travel from the current pose to the first pose in the transformed segment. |
|
Represents a single desired 6-DoF waypoint, along with corresponding gripper actuation for this point. |
|
Represents a sequence of Waypoint objects. |
|
A sequence of WaypointSequence objects that corresponds to a full 6-DoF trajectory. |
Data Generator#
- class isaaclab_mimic.datagen.DataGenerator[source]#
The main data generator object that loads a source dataset, parses it, and generates new trajectories.
Methods:
__init__
(env[, src_demo_datagen_info_pool, ...])- param env:
environment to use for data generation
Apply random offsets to sample subtask boundaries according to the task spec.
select_source_demo
(eef_pose, object_pose, ...)Helper method to run source subtask segment selection.
generate
(env_id, success_term[, ...])Attempt to generate a new demonstration.
- __init__(env, src_demo_datagen_info_pool=None, dataset_path=None, demo_keys=None)[source]#
- Parameters:
env (Isaac Lab ManagerBasedEnv instance) – environment to use for data generation
src_demo_datagen_info_pool (DataGenInfoPool) – source demo datagen info pool
dataset_path (str) – path to hdf5 dataset to use for generation
demo_keys (list of str) – list of demonstration keys to use in file. If not provided, all demonstration keys will be used.
- randomize_subtask_boundaries()[source]#
Apply random offsets to sample subtask boundaries according to the task spec. Recall that each demonstration is segmented into a set of subtask segments, and the end index of each subtask can have a random offset.
- select_source_demo(eef_pose, object_pose, subtask_ind, src_subtask_inds, subtask_object_name, selection_strategy_name, selection_strategy_kwargs=None)[source]#
Helper method to run source subtask segment selection.
- Parameters:
eef_pose (np.array) – current end effector pose
object_pose (np.array) – current object pose for this subtask
subtask_ind (int) – index of subtask
src_subtask_inds (np.array) – start and end indices for subtask segment in source demonstrations of shape (N, 2)
subtask_object_name (str) – name of reference object for this subtask
selection_strategy_name (str) – name of selection strategy
selection_strategy_kwargs (dict) – extra kwargs for running selection strategy
- Returns:
selected source demo index
- Return type:
selected_src_demo_ind (int)
- async generate(env_id, success_term, env_action_queue: Queue | None = None, select_src_per_subtask=False, transform_first_robot_pose=False, interpolate_from_last_target_pose=True, pause_subtask=False, export_demo=True)[source]#
Attempt to generate a new demonstration.
- Parameters:
env_id (int) – environment ID
success_term (TerminationTermCfg) – success function to check if the task is successful
env_action_queue (asyncio.Queue) – queue to store actions for each environment
select_src_per_subtask (bool) – if True, select a different source demonstration for each subtask during data generation, else keep the same one for the entire episode
transform_first_robot_pose (bool) – if True, each subtask segment will consist of the first robot pose and the target poses instead of just the target poses. Can sometimes help improve data generation quality as the interpolation segment will interpolate to where the robot started in the source segment instead of the first target pose. Note that the first subtask segment of each episode will always include the first robot pose, regardless of this argument.
interpolate_from_last_target_pose (bool) – if True, each interpolation segment will start from the last target pose in the previous subtask segment, instead of the current robot pose. Can sometimes improve data generation quality.
pause_subtask (bool) – if True, pause after every subtask during generation, for debugging.
- Returns:
- dictionary with the following items:
initial_state (dict): initial simulator state for the executed trajectory states (list): simulator state at each timestep observations (list): observation dictionary at each timestep datagen_infos (list): datagen_info at each timestep actions (np.array): action executed at each timestep success (bool): whether the trajectory successfully solved the task or not src_demo_inds (list): list of selected source demonstration indices for each subtask src_demo_labels (np.array): same as @src_demo_inds, but repeated to have a label for each timestep of the trajectory
- Return type:
results (dict)
Datagen Info#
- class isaaclab_mimic.datagen.DatagenInfo[source]#
Defines the structure of information required from an environment for data generation processes. The DatagenInfo class centralizes all essential data elements needed for data generation in one place, reducing the overhead and complexity of repeatedly querying the environment whenever this information is needed.
To allow for flexibility,not all information must be present.
Core Elements: - eef_pose: Captures the current 6 dimensional poses of the robot’s end-effector. - object_poses: Captures the 6 dimensional poses of relevant objects in the scene. - subtask_term_signals: Captures subtask completions signals. - target_eef_pose: Captures the target 6 dimensional poses for robot’s end effector at each time step. - gripper_action: Captures the gripper’s state.
Methods:
__init__
([eef_pose, object_poses, ...])- param eef_pose:
robot end effector poses of shape [..., 4, 4]
- __init__(eef_pose=None, object_poses=None, subtask_term_signals=None, target_eef_pose=None, gripper_action=None)[source]#
- Parameters:
eef_pose (torch.Tensor or None) – robot end effector poses of shape […, 4, 4]
object_poses (dict or None) – dictionary mapping object name to object poses of shape […, 4, 4]
subtask_term_signals (dict or None) – dictionary mapping subtask name to a binary indicator (0 or 1) on whether subtask has been completed. Each value in the dictionary could be an int, float, or torch.Tensor of shape […, 1].
target_eef_pose (torch.Tensor or None) – target end effector poses of shape […, 4, 4]
gripper_action (torch.Tensor or None) – gripper actions of shape […, D] where D is the dimension of the gripper actuation action for the robot arm
Datagen Info Pool#
- class isaaclab_mimic.datagen.DataGenInfoPool[source]#
Pool of DatagenInfo for data generation.
This class is a container for storing DatagenInfo objects that are extracted from episodes. The pool supports the use of an asyncio lock to safely add new episodes to the pool while consuming the data, so it can be shared across multiple mimic data generators.
Methods:
__init__
(env, env_cfg, device[, asyncio_lock])- param env_cfg:
environment configuration
add_episode
(episode)Add a datagen info from the given episode.
load_from_dataset_file
(file_path[, ...])Load from a dataset file.
Attributes:
Returns the datagen infos.
Returns the subtask indices.
Returns the asyncio lock.
Returns the number of datagen infos.
- __init__(env, env_cfg, device, asyncio_lock: Lock | None = None)[source]#
- Parameters:
env_cfg (dict) – environment configuration
device (torch.device) – device to store the data
asyncio_lock (asyncio.Lock or None) – asyncio lock to use for thread safety
- property datagen_infos#
Returns the datagen infos.
- property subtask_indices#
Returns the subtask indices.
- property asyncio_lock#
Returns the asyncio lock.
- property num_datagen_infos#
Returns the number of datagen infos.
Random Strategy#
- class isaaclab_mimic.datagen.RandomStrategy[source]#
Pick source demonstration randomly.
Methods:
select_source_demo
(eef_pose, object_pose, ...)Selects source demonstration index using the current robot pose, relevant object pose for the current subtask, and relevant information from the source demonstrations for the current subtask.
__init__
()- select_source_demo(eef_pose, object_pose, src_subtask_datagen_infos)[source]#
Selects source demonstration index using the current robot pose, relevant object pose for the current subtask, and relevant information from the source demonstrations for the current subtask.
- Parameters:
eef_pose (torch.Tensor) – current 4x4 eef pose
object_pose (torch.Tensor) – current 4x4 object pose, for the object in this subtask
src_subtask_datagen_infos (list) – DatagenInfo instance for the relevant subtask segment in the source demonstrations
- Returns:
index of source demonstration - indicates which source subtask segment to use
- Return type:
source_demo_ind (int)
- __init__()#
Nearest Neighbor Object Strategy#
- class isaaclab_mimic.datagen.NearestNeighborObjectStrategy[source]#
Pick source demonstration to be the one with the closest object pose to the object in the current scene.
Methods:
select_source_demo
(eef_pose, object_pose, ...)Selects source demonstration index using the current robot pose, relevant object pose for the current subtask, and relevant information from the source demonstrations for the current subtask.
__init__
()- select_source_demo(eef_pose, object_pose, src_subtask_datagen_infos, pos_weight=1.0, rot_weight=1.0, nn_k=3)[source]#
Selects source demonstration index using the current robot pose, relevant object pose for the current subtask, and relevant information from the source demonstrations for the current subtask.
- Parameters:
eef_pose (torch.Tensor) – current 4x4 eef pose
object_pose (torch.Tensor) – current 4x4 object pose, for the object in this subtask
src_subtask_datagen_infos (list) – DatagenInfo instance for the relevant subtask segment in the source demonstrations
pos_weight (float) – weight on position for minimizing pose distance
rot_weight (float) – weight on rotation for minimizing pose distance
nn_k (int) – pick source demo index uniformly at randomly from the top @nn_k nearest neighbors
- Returns:
index of source demonstration - indicates which source subtask segment to use
- Return type:
source_demo_ind (int)
- __init__()#
Nearest Neighbor Robot Distance Strategy#
- class isaaclab_mimic.datagen.NearestNeighborRobotDistanceStrategy[source]#
Pick source demonstration to be the one that minimizes the distance the robot end effector will need to travel from the current pose to the first pose in the transformed segment.
Methods:
select_source_demo
(eef_pose, object_pose, ...)Selects source demonstration index using the current robot pose, relevant object pose for the current subtask, and relevant information from the source demonstrations for the current subtask.
__init__
()- select_source_demo(eef_pose, object_pose, src_subtask_datagen_infos, pos_weight=1.0, rot_weight=1.0, nn_k=3)[source]#
Selects source demonstration index using the current robot pose, relevant object pose for the current subtask, and relevant information from the source demonstrations for the current subtask.
- Parameters:
eef_pose (torch.Tensor) – current 4x4 eef pose
object_pose (torch.Tensor) – current 4x4 object pose, for the object in this subtask
src_subtask_datagen_infos (list) – DatagenInfo instance for the relevant subtask segment in the source demonstrations
pos_weight (float) – weight on position for minimizing pose distance
rot_weight (float) – weight on rotation for minimizing pose distance
nn_k (int) – pick source demo index uniformly at randomly from the top @nn_k nearest neighbors
- Returns:
index of source demonstration - indicates which source subtask segment to use
- Return type:
source_demo_ind (int)
- __init__()#
Waypoint#
- class isaaclab_mimic.datagen.Waypoint[source]#
Represents a single desired 6-DoF waypoint, along with corresponding gripper actuation for this point.
Methods:
__init__
(eef_names, pose, gripper_action[, ...])- param pose:
4x4 pose target for robot controller
- __init__(eef_names, pose, gripper_action, noise=None)[source]#
- Parameters:
pose (torch.Tensor) – 4x4 pose target for robot controller
gripper_action (torch.Tensor) – gripper action for robot controller
noise (float or None) – action noise amplitude to apply during execution at this timestep (for arm actions, not gripper actions)
Waypoint Sequence#
- class isaaclab_mimic.datagen.WaypointSequence[source]#
Represents a sequence of Waypoint objects.
Methods:
__init__
([sequence])- param sequence:
if provided, should be a list of Waypoint objects
from_poses
(eef_names, poses, ...)Instantiate a WaypointSequence object given a sequence of poses, gripper actions, and action noise.
split
(ind)Splits this sequence into 2 pieces, the part up to time index @ind, and the rest.
Attributes:
Return last waypoint in sequence.
- __init__(sequence=None)[source]#
- Parameters:
sequence (list or None) – if provided, should be a list of Waypoint objects
- classmethod from_poses(eef_names, poses, gripper_actions, action_noise)[source]#
Instantiate a WaypointSequence object given a sequence of poses, gripper actions, and action noise.
- Parameters:
poses (torch.Tensor) – sequence of pose matrices of shape (T, 4, 4)
gripper_actions (torch.Tensor) – sequence of gripper actions that should be applied at each timestep of shape (T, D).
action_noise (float or torch.Tensor) – sequence of action noise magnitudes that should be applied at each timestep. If a single float is provided, the noise magnitude will be constant over the trajectory.
- property last_waypoint#
Return last waypoint in sequence.
- Returns:
waypoint (Waypoint instance)
Waypoint Trajectory#
- class isaaclab_mimic.datagen.WaypointTrajectory[source]#
A sequence of WaypointSequence objects that corresponds to a full 6-DoF trajectory.
Methods:
__init__
()add_waypoint_sequence
(sequence)Directly append sequence to list (no interpolation).
add_waypoint_sequence_for_target_pose
(...[, ...])Adds a new waypoint sequence corresponding to a desired target pose.
Removes first waypoint in first waypoint sequence and returns it.
merge
(other, eef_names[, num_steps_interp, ...])Merge this trajectory with another (@other).
execute
(env, env_id, success_term[, ...])Main function to execute the trajectory.
Attributes:
Return last waypoint in sequence.
- property last_waypoint#
Return last waypoint in sequence.
- Returns:
waypoint (Waypoint instance)
- add_waypoint_sequence(sequence)[source]#
Directly append sequence to list (no interpolation).
- Parameters:
sequence (WaypointSequence instance) – sequence to add
- add_waypoint_sequence_for_target_pose(eef_names, pose, gripper_action, num_steps, skip_interpolation=False, action_noise=0.0)[source]#
Adds a new waypoint sequence corresponding to a desired target pose. A new WaypointSequence will be constructed consisting of @num_steps intermediate Waypoint objects. These can either be constructed with linear interpolation from the last waypoint (default) or be a constant set of target poses (set @skip_interpolation to True).
- Parameters:
pose (torch.Tensor) – 4x4 target pose
gripper_action (torch.Tensor) – value for gripper action
num_steps (int) – number of action steps when trying to reach this waypoint. Will add intermediate linearly interpolated points between the last pose on this trajectory and the target pose, so that the total number of steps is @num_steps.
skip_interpolation (bool) – if True, keep the target pose fixed and repeat it @num_steps times instead of using linearly interpolated targets.
action_noise (float) – scale of random gaussian noise to add during action execution (e.g. when @execute is called)
- pop_first()[source]#
Removes first waypoint in first waypoint sequence and returns it. If the first waypoint sequence is now empty, it is also removed.
- Returns:
waypoint (Waypoint instance)
- merge(other, eef_names, num_steps_interp=None, num_steps_fixed=None, action_noise=0.0)[source]#
Merge this trajectory with another (@other).
- Parameters:
other (WaypointTrajectory object) – the other trajectory to merge into this one
num_steps_interp (int or None) – if not None, add a waypoint sequence that interpolates between the end of the current trajectory and the start of @other
num_steps_fixed (int or None) – if not None, add a waypoint sequence that has constant target poses corresponding to the first target pose in @other
action_noise (float) – noise to use during the interpolation segment
- async execute(env, env_id, success_term, env_action_queue: Queue | None = None)[source]#
Main function to execute the trajectory. Will use env_interface.target_eef_pose_to_action to convert each target pose at each waypoint to an action command, and pass that along to env.step.
- Parameters:
env (Isaac Lab ManagerBasedEnv instance) – environment to use for executing trajectory
env_id (int) – environment index
success_term – success term to check if the task is successful
env_action_queue (asyncio.Queue) – queue for sending actions to the environment
- Returns:
- dictionary with the following items for the executed trajectory:
states (list): simulator state at each timestep observations (list): observation dictionary at each timestep datagen_infos (list): datagen_info at each timestep actions (list): action executed at each timestep success (bool): whether the trajectory successfully solved the task or not
- Return type:
results (dict)