Source code for isaaclab_teleop.isaac_teleop_device

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""IsaacTeleop-based teleoperation device for Isaac Lab."""

from __future__ import annotations

import logging
from collections.abc import Callable
from typing import TYPE_CHECKING

import numpy as np
import torch

from .command_handler import CommandHandler
from .control_events import ControlEvents
from .isaac_teleop_cfg import IsaacTeleopCfg
from .session_lifecycle import TeleopSessionLifecycle
from .xr_anchor_manager import XrAnchorManager

if TYPE_CHECKING:
    from .session_lifecycle import SupportsDLPack

logger = logging.getLogger(__name__)


[docs] class IsaacTeleopDevice: """A IsaacTeleop-based teleoperation device for Isaac Lab. This device provides an interface between IsaacTeleop's retargeting pipeline and Isaac Lab environments. It composes three focused collaborators: * :class:`XrAnchorManager` -- XR anchor prim setup, synchronization, and coordinate-frame transform computation. * :class:`TeleopSessionLifecycle` -- pipeline building, OpenXR handle acquisition, session creation/destruction, and action-tensor extraction. * :class:`CommandHandler` -- callback registration for START / STOP / RESET commands, bridged from the pipeline-based control events. Together they manage: 1. XR anchor configuration and synchronization 2. IsaacTeleop session lifecycle 3. Action tensor generation from the retargeting pipeline The device uses IsaacTeleop's TensorReorderer to flatten pipeline outputs into a single action tensor matching the environment's action space. Frame rebasing: By default, all output poses are expressed in the simulation world frame. When an application needs poses in a different frame (e.g. robot base link for IK), there are two options: * **Config-driven** (recommended): set :attr:`~IsaacTeleopCfg.target_frame_prim_path` to the USD prim whose frame the output should be expressed in. The device reads the prim's world transform each frame and applies the rebase automatically. * **Explicit**: pass a ``target_T_world`` matrix directly to :meth:`advance`. In both cases the device composes ``target_T_world @ world_T_anchor`` before feeding the matrix into the retargeting pipeline, so all resulting poses are expressed in the target frame. Teleop commands: The device supports callbacks for START, STOP, and RESET commands that can be triggered via the message-channel control pipeline or registered directly via :meth:`add_callback`. Example: .. code-block:: python cfg = IsaacTeleopCfg( pipeline_builder=my_pipeline_builder, sim_device="cuda:0", ) # Poses in world frame (default) with IsaacTeleopDevice(cfg) as device: while running: action = device.advance() env.step(action.repeat(num_envs, 1)) # Config-driven rebase into robot base frame cfg.target_frame_prim_path = "/World/Robot/base_link" with IsaacTeleopDevice(cfg) as device: while running: action = device.advance() env.step(action.repeat(num_envs, 1)) # Explicit rebase into robot base frame with IsaacTeleopDevice(cfg) as device: while running: robot_T_world = get_robot_base_transform() action = device.advance(target_T_world=robot_T_world) env.step(action.repeat(num_envs, 1)) """
[docs] def __init__( self, cfg: IsaacTeleopCfg, cloudxr_env_file: str | None = None, auto_launch_cloudxr: bool = True, mcap_record_path: str | None = None, mcap_replay_path: str | None = None, ): """Initialize the IsaacTeleop device. Args: cfg: Configuration object for IsaacTeleop settings. cloudxr_env_file: Optional path to a CloudXR ``.env`` file. When provided and *auto_launch_cloudxr* is ``True``, the CloudXR runtime is launched automatically during session start. When ``None``, no CloudXR runtime is launched. auto_launch_cloudxr: Whether to auto-launch the CloudXR runtime when *cloudxr_env_file* is set. Ignored when *cloudxr_env_file* is ``None``. mcap_record_path: Optional MCAP file path to record the live teleop session into. Mutually exclusive with *mcap_replay_path*. Debug-grade only -- the produced file has no per-episode segmentation, no world-frame anchor, and no public Python decoder. mcap_replay_path: Optional MCAP file path to replay. When set, the device runs in :class:`SessionMode.REPLAY` with no live XR connection and feeds the recorded tracker stream through the pipeline. Mutually exclusive with *mcap_record_path*. """ self._cfg = cfg self._anchor_manager = XrAnchorManager(cfg.xr_cfg) self._command_handler = CommandHandler() self._session_lifecycle = TeleopSessionLifecycle( cfg, cloudxr_env_file=cloudxr_env_file, auto_launch_cloudxr=auto_launch_cloudxr, mcap_record_path=mcap_record_path, mcap_replay_path=mcap_replay_path, ) self._prev_right_a_pressed = False self._prev_control_is_active: bool | None = None
def __del__(self): """Clean up resources when the object is destroyed.""" if hasattr(self, "_command_handler"): self._command_handler.cleanup() if hasattr(self, "_anchor_manager"): self._anchor_manager.cleanup() def __str__(self) -> str: """Returns a string containing information about the IsaacTeleop device.""" xr_cfg = self._cfg.xr_cfg msg = f"IsaacTeleop Device: {self.__class__.__name__}\n" msg += f"\tAnchor Position: {xr_cfg.anchor_pos}\n" msg += f"\tAnchor Rotation: {xr_cfg.anchor_rot}\n" if xr_cfg.anchor_prim_path is not None: msg += f"\tAnchor Prim Path: {xr_cfg.anchor_prim_path} (Dynamic Anchoring)\n" else: msg += "\tAnchor Mode: Static (Root Level)\n" msg += f"\tSim Device: {self._cfg.sim_device}\n" msg += f"\tApp Name: {self._cfg.app_name}\n" msg += "\t----------------------------------------------\n" msg += "\tAvailable Commands:\n" callbacks = self._command_handler.callbacks start_avail = "START" in callbacks stop_avail = "STOP" in callbacks reset_avail = "RESET" in callbacks msg += f"\t\tStart Teleoperation: {'registered' if start_avail else 'not registered'}\n" msg += f"\t\tStop Teleoperation: {'registered' if stop_avail else 'not registered'}\n" msg += f"\t\tReset Environment: {'registered' if reset_avail else 'not registered'}\n" return msg def __enter__(self) -> IsaacTeleopDevice: """Enter the context manager and prepare the IsaacTeleop session. Builds the retargeting pipeline and attempts to acquire OpenXR handles from Kit's XR bridge extension. If the handles are not yet available (e.g. the user has not clicked "Start AR"), session creation is deferred and will be retried automatically on each :meth:`advance` call. Returns: Self for context manager protocol. """ self._session_lifecycle.start() return self def __exit__(self, exc_type, exc_val, exc_tb): """Exit the context manager and clean up the IsaacTeleop session.""" self._anchor_manager.cleanup() self._session_lifecycle.stop(exc_type, exc_val, exc_tb) return False
[docs] def reset(self) -> None: """Reset the device state. Resets the XR anchor synchronizer and schedules a ``reset`` :class:`~isaacteleop.retargeting_engine.interface.execution_events.ExecutionEvents` for the next pipeline step so that all retargeters reinitialize their cross-step state. """ self._anchor_manager.reset() self._session_lifecycle.request_reset()
@property def last_control_events(self) -> ControlEvents: """Control events from the most recent :meth:`advance`. Returns a :class:`ControlEvents` derived from the teleop control pipeline. When no control channel is configured, returns a default (no-op) :class:`ControlEvents`. """ return self._session_lifecycle.last_control_events
[docs] def add_callback(self, key: str, func: Callable) -> None: """Add a callback function for teleop commands. Args: key: The command type to bind to. Valid values are "START", "STOP", "RESET", and "R". func: The function to call when the command is received. Should take no arguments. """ self._command_handler.add_callback(key, func)
[docs] def advance(self, target_T_world: np.ndarray | torch.Tensor | SupportsDLPack | None = None) -> torch.Tensor | None: """Process current device state and return control commands. If the IsaacTeleop session has not been started yet (because the OpenXR handles were not available at ``__enter__`` time), this method will attempt to start it on each call. Once the user clicks "Start AR" and the handles become available, the session is created transparently. Args: target_T_world: Optional 4x4 transform matrix that rebases all output poses into an arbitrary target coordinate frame. When provided, the matrix sent to the retargeting pipeline becomes ``target_T_world @ world_T_anchor`` instead of just ``world_T_anchor``, so all resulting poses are expressed in the target frame rather than the simulation world frame. Typical use case: pass ``robot_base_T_world`` so that an IK controller receives end-effector poses in the robot's base link frame. Accepts any object supporting the DLPack buffer protocol (``__dlpack__``), including :class:`numpy.ndarray`, :class:`torch.Tensor`, and ``wp.array``. When ``None`` and :attr:`~IsaacTeleopCfg.target_frame_prim_path` is set, the transform is computed automatically by reading the prim's world matrix from Fabric and inverting it. Returns: A flattened action :class:`torch.Tensor` ready for the Isaac Lab environment, or ``None`` if the session has not started yet (e.g. still waiting for the user to start AR). Raises: RuntimeError: If called outside of a context manager. """ # Auto-compute target_T_world from config if not explicitly provided if target_T_world is None and self._cfg.target_frame_prim_path is not None: target_T_world = self._get_target_frame_T_world() # Step the session (handles lazy start and action extraction) action = self._session_lifecycle.step( anchor_world_matrix_fn=self._anchor_manager.get_world_matrix, target_T_world=target_T_world, ) if action is not None: # Poll controller buttons (e.g. toggle anchor rotation on right 'A' press) self._poll_buttons() self._dispatch_control_callbacks() return action
# ------------------------------------------------------------------ # Control event -> callback bridge # ------------------------------------------------------------------ def _dispatch_control_callbacks(self) -> None: """Fire legacy callbacks when control events indicate a state change. This bridges the pipeline-based :class:`ControlEvents` with the callback-based :class:`CommandHandler` so that scripts which registered callbacks via :meth:`add_callback` still receive dispatches. Only fires START/STOP when ``is_active`` transitions between ``True`` and ``False``; initial transitions from ``None`` are ignored to avoid spurious callbacks during ``DefaultTeleopStateManager``'s STOPPED -> PAUSED progression. """ from .control_events import _NO_OP_EVENTS events = self._session_lifecycle.last_control_events if events is _NO_OP_EVENTS: return if events.should_reset: self._command_handler.fire("RESET") self._anchor_manager.reset() if events.is_active is not None: if self._prev_control_is_active is not None and events.is_active != self._prev_control_is_active: self._command_handler.fire("START" if events.is_active else "STOP") self._prev_control_is_active = events.is_active # ------------------------------------------------------------------ # Target frame transform (config-driven rebase) # ------------------------------------------------------------------ def _get_target_frame_T_world(self) -> np.ndarray | None: """Read the target-frame prim's world matrix from Fabric and return its inverse. Uses USDRT to read the prim's hierarchical world matrix, matching the pattern used by :class:`XrAnchorSynchronizer` for anchor prim reads. Returns: A (4, 4) float32 :class:`numpy.ndarray` representing the inverse of the prim's world transform (i.e. ``target_T_world``), or ``None`` if the prim cannot be read. """ try: import omni.usd import usdrt from pxr import UsdUtils from usdrt import Rt stage = omni.usd.get_context().get_stage() stage_cache = UsdUtils.StageCache.Get() stage_id = stage_cache.GetId(stage).ToLongInt() if stage_id < 0: stage_id = stage_cache.Insert(stage).ToLongInt() rt_stage = usdrt.Usd.Stage.Attach(stage_id) if rt_stage is None: return None rt_prim = rt_stage.GetPrimAtPath(self._cfg.target_frame_prim_path) if not rt_prim.IsValid(): return None rt_xformable = Rt.Xformable(rt_prim) if not rt_xformable.GetPrim().IsValid(): return None world_matrix_attr = rt_xformable.GetFabricHierarchyWorldMatrixAttr() if world_matrix_attr is None: return None rt_matrix = world_matrix_attr.Get() if rt_matrix is None: return None pos = rt_matrix.ExtractTranslation() rt_quat = rt_matrix.ExtractRotationQuat() from scipy.spatial.transform import Rotation quat_xyzw = [ float(rt_quat.GetImaginary()[0]), float(rt_quat.GetImaginary()[1]), float(rt_quat.GetImaginary()[2]), float(rt_quat.GetReal()), ] R = Rotation.from_quat(quat_xyzw).as_matrix().astype(np.float32) t = np.array([float(pos[0]), float(pos[1]), float(pos[2])], dtype=np.float32) inv_mat = np.eye(4, dtype=np.float32) inv_mat[:3, :3] = R.T inv_mat[:3, 3] = -(R.T @ t) return inv_mat except Exception as e: logger.warning(f"Failed to read target frame prim '{self._cfg.target_frame_prim_path}': {e}") return None # ------------------------------------------------------------------ # Controller button polling (glue between session and anchor manager) # ------------------------------------------------------------------ def _poll_buttons(self) -> None: """Poll controller buttons and trigger actions on rising edges. Called once per :meth:`advance` frame, after ``session.step()`` has executed the pipeline so the controller ``TensorGroup`` is fresh. Currently handles: * Right controller primary button ("A") -- toggles anchor rotation. """ from isaacteleop.retargeting_engine.tensor_types import ControllerInputIndex right_data = self._session_lifecycle.last_right_controller if right_data is None or right_data.is_none: return current = float(right_data[ControllerInputIndex.PRIMARY_CLICK]) > 0.5 if current and not self._prev_right_a_pressed: self._anchor_manager.toggle_anchor_rotation() self._prev_right_a_pressed = current
def _enable_teleop_bridge() -> None: """Enable the XR teleop bridge extension and configure carb settings. Must be called after the Omniverse AppLauncher has started. """ import carb.settings import omni.kit.app carb.settings.get_settings().set("/persistent/xr/openxr/disableInputBindings", True) carb.settings.get_settings().set('/xr/openxr/components/"isaacsim.kit.xr.teleop.bridge"/enabled', True) ext_manager = omni.kit.app.get_app().get_extension_manager() ext_manager.set_extension_enabled_immediate("isaacsim.kit.xr.teleop.bridge", True)
[docs] def create_isaac_teleop_device( cfg: IsaacTeleopCfg, sim_device: str | None = None, callbacks: dict[str, Callable] | None = None, cloudxr_env_file: str | None = None, auto_launch_cloudxr: bool = True, mcap_record_path: str | None = None, mcap_replay_path: str | None = None, ) -> IsaacTeleopDevice: """Create an :class:`IsaacTeleopDevice` with required Omniverse extension setup. This helper centralises the boilerplate that every script must execute before constructing an :class:`IsaacTeleopDevice`: 1. Disable default OpenXR input bindings (prevents conflicts). 2. Enable the ``isaacsim.kit.xr.teleop.bridge`` extension (live mode only -- replay mode skips this since it never touches the XR runtime). 3. Optionally override :attr:`IsaacTeleopCfg.sim_device` so action tensors land on the same device the caller uses for the simulation. Note: When *sim_device* is provided, ``cfg.sim_device`` is mutated in place before the device is constructed. Args: cfg: IsaacTeleop configuration. sim_device: If provided, overrides ``cfg.sim_device`` so action tensors are placed on the requested torch device (e.g. ``"cuda:0"``). callbacks: Optional mapping of command keys (e.g. ``"START"``, ``"STOP"``, ``"RESET"``) to callables registered on the device. cloudxr_env_file: Optional path to a CloudXR ``.env`` file. When provided and *auto_launch_cloudxr* is ``True``, the CloudXR runtime and WSS proxy are launched automatically during session start. When ``None``, no CloudXR runtime is launched. auto_launch_cloudxr: Whether to auto-launch the CloudXR runtime when *cloudxr_env_file* is set. Set to ``False`` to skip the launch (e.g. when running the runtime externally). Ignored when *cloudxr_env_file* is ``None``. mcap_record_path: Optional MCAP file path to record the live teleop session into. Debug-grade only. Mutually exclusive with *mcap_replay_path*. mcap_replay_path: Optional MCAP file path to replay. When set, the returned device runs in :class:`SessionMode.REPLAY` and the XR teleop bridge is left untouched. Mutually exclusive with *mcap_record_path*. Returns: A fully configured :class:`IsaacTeleopDevice` ready for use in a ``with`` block. """ if mcap_record_path is not None and mcap_replay_path is not None: raise ValueError( "mcap_record_path and mcap_replay_path are mutually exclusive; " "set at most one to switch between LIVE recording and REPLAY playback." ) # Replay sessions never talk to Kit's XR bridge, so loading/enabling the # bridge extension would only add startup latency and noisy log lines. if mcap_replay_path is None: _enable_teleop_bridge() if sim_device is not None: cfg.sim_device = sim_device if mcap_replay_path is not None: logger.info("Using IsaacTeleop stack for teleoperation (REPLAY mode)") else: logger.info("Using IsaacTeleop stack for teleoperation") device = IsaacTeleopDevice( cfg, cloudxr_env_file=cloudxr_env_file, auto_launch_cloudxr=auto_launch_cloudxr, mcap_record_path=mcap_record_path, mcap_replay_path=mcap_replay_path, ) if callbacks is not None: for key, func in callbacks.items(): device.add_callback(key, func) return device