# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""IsaacTeleop-based teleoperation device for Isaac Lab."""
from __future__ import annotations
import logging
from collections.abc import Callable
from typing import TYPE_CHECKING
import numpy as np
import torch
from .command_handler import CommandHandler
from .isaac_teleop_cfg import IsaacTeleopCfg
from .session_lifecycle import TeleopSessionLifecycle
from .xr_anchor_manager import XrAnchorManager
if TYPE_CHECKING:
from .session_lifecycle import SupportsDLPack
logger = logging.getLogger(__name__)
[docs]
class IsaacTeleopDevice:
"""A IsaacTeleop-based teleoperation device for Isaac Lab.
This device provides an interface between IsaacTeleop's retargeting pipeline
and Isaac Lab environments. It composes three focused collaborators:
* :class:`XrAnchorManager` -- XR anchor prim setup, synchronization,
and coordinate-frame transform computation.
* :class:`TeleopSessionLifecycle` -- pipeline building, OpenXR handle
acquisition, session creation/destruction, and action-tensor extraction.
* :class:`CommandHandler` -- callback registration and XR message-bus
command dispatch.
Together they manage:
1. XR anchor configuration and synchronization
2. IsaacTeleop session lifecycle
3. Action tensor generation from the retargeting pipeline
The device uses IsaacTeleop's TensorReorderer to flatten pipeline outputs
into a single action tensor matching the environment's action space.
Frame rebasing:
By default, all output poses are expressed in the simulation world
frame. When an application needs poses in a different frame (e.g.
robot base link for IK), there are two options:
* **Config-driven** (recommended): set
:attr:`~IsaacTeleopCfg.target_frame_prim_path` to the USD prim
whose frame the output should be expressed in. The device reads
the prim's world transform each frame and applies the rebase
automatically.
* **Explicit**: pass a ``target_T_world`` matrix directly to
:meth:`advance`.
In both cases the device composes
``target_T_world @ world_T_anchor`` before feeding the matrix into
the retargeting pipeline, so all resulting poses are expressed in the
target frame.
Teleop commands:
The device supports callbacks for START, STOP, and RESET commands
that can be triggered via XR controller buttons or the message bus.
Example:
.. code-block:: python
cfg = IsaacTeleopCfg(
pipeline_builder=my_pipeline_builder,
sim_device="cuda:0",
)
# Poses in world frame (default)
with IsaacTeleopDevice(cfg) as device:
while running:
action = device.advance()
env.step(action.repeat(num_envs, 1))
# Config-driven rebase into robot base frame
cfg.target_frame_prim_path = "/World/Robot/base_link"
with IsaacTeleopDevice(cfg) as device:
while running:
action = device.advance()
env.step(action.repeat(num_envs, 1))
# Explicit rebase into robot base frame
with IsaacTeleopDevice(cfg) as device:
while running:
robot_T_world = get_robot_base_transform()
action = device.advance(target_T_world=robot_T_world)
env.step(action.repeat(num_envs, 1))
"""
[docs]
def __init__(self, cfg: IsaacTeleopCfg):
"""Initialize the IsaacTeleop device.
Args:
cfg: Configuration object for IsaacTeleop settings.
"""
self._cfg = cfg
# Compose the three collaborators
self._anchor_manager = XrAnchorManager(cfg.xr_cfg)
self._session_lifecycle = TeleopSessionLifecycle(cfg)
self._command_handler = CommandHandler(
xr_core=self._anchor_manager.xr_core,
on_reset=self._anchor_manager.reset,
)
# Controller button polling state (edge detection for right 'A')
self._prev_right_a_pressed = False
def __del__(self):
"""Clean up resources when the object is destroyed."""
if hasattr(self, "_command_handler"):
self._command_handler.cleanup()
if hasattr(self, "_anchor_manager"):
self._anchor_manager.cleanup()
def __str__(self) -> str:
"""Returns a string containing information about the IsaacTeleop device."""
xr_cfg = self._cfg.xr_cfg
msg = f"IsaacTeleop Device: {self.__class__.__name__}\n"
msg += f"\tAnchor Position: {xr_cfg.anchor_pos}\n"
msg += f"\tAnchor Rotation: {xr_cfg.anchor_rot}\n"
if xr_cfg.anchor_prim_path is not None:
msg += f"\tAnchor Prim Path: {xr_cfg.anchor_prim_path} (Dynamic Anchoring)\n"
else:
msg += "\tAnchor Mode: Static (Root Level)\n"
msg += f"\tSim Device: {self._cfg.sim_device}\n"
msg += f"\tApp Name: {self._cfg.app_name}\n"
msg += "\t----------------------------------------------\n"
msg += "\tAvailable Commands:\n"
callbacks = self._command_handler.callbacks
start_avail = "START" in callbacks
stop_avail = "STOP" in callbacks
reset_avail = "RESET" in callbacks
msg += f"\t\tStart Teleoperation: {'registered' if start_avail else 'not registered'}\n"
msg += f"\t\tStop Teleoperation: {'registered' if stop_avail else 'not registered'}\n"
msg += f"\t\tReset Environment: {'registered' if reset_avail else 'not registered'}\n"
return msg
def __enter__(self) -> IsaacTeleopDevice:
"""Enter the context manager and prepare the IsaacTeleop session.
Builds the retargeting pipeline and attempts to acquire OpenXR handles
from Kit's XR bridge extension. If the handles are not yet available
(e.g. the user has not clicked "Start AR"), session creation is deferred
and will be retried automatically on each :meth:`advance` call.
Returns:
Self for context manager protocol.
"""
self._session_lifecycle.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager and clean up the IsaacTeleop session."""
self._anchor_manager.cleanup()
self._session_lifecycle.stop(exc_type, exc_val, exc_tb)
return False
[docs]
def reset(self) -> None:
"""Reset the device state.
Resets the XR anchor synchronizer if present.
"""
self._anchor_manager.reset()
[docs]
def add_callback(self, key: str, func: Callable) -> None:
"""Add a callback function for teleop commands.
Args:
key: The command type to bind to. Valid values are "START", "STOP", "RESET", and "R".
func: The function to call when the command is received. Should take no arguments.
"""
self._command_handler.add_callback(key, func)
[docs]
def advance(self, target_T_world: np.ndarray | torch.Tensor | SupportsDLPack | None = None) -> torch.Tensor | None:
"""Process current device state and return control commands.
If the IsaacTeleop session has not been started yet (because the OpenXR
handles were not available at ``__enter__`` time), this method will
attempt to start it on each call. Once the user clicks "Start AR" and
the handles become available, the session is created transparently.
Args:
target_T_world: Optional 4x4 transform matrix that rebases all
output poses into an arbitrary target coordinate frame. When
provided, the matrix sent to the retargeting pipeline becomes
``target_T_world @ world_T_anchor`` instead of just
``world_T_anchor``, so all resulting poses are expressed in
the target frame rather than the simulation world frame.
Typical use case: pass ``robot_base_T_world`` so that an IK
controller receives end-effector poses in the robot's base
link frame.
Accepts any object supporting the DLPack buffer protocol
(``__dlpack__``), including :class:`numpy.ndarray`,
:class:`torch.Tensor`, and ``wp.array``.
When ``None`` and
:attr:`~IsaacTeleopCfg.target_frame_prim_path` is set, the
transform is computed automatically by reading the prim's
world matrix from Fabric and inverting it.
Returns:
A flattened action :class:`torch.Tensor` ready for the Isaac Lab
environment, or ``None`` if the session has not started yet
(e.g. still waiting for the user to start AR).
Raises:
RuntimeError: If called outside of a context manager.
"""
# Auto-compute target_T_world from config if not explicitly provided
if target_T_world is None and self._cfg.target_frame_prim_path is not None:
target_T_world = self._get_target_frame_T_world()
# Step the session (handles lazy start and action extraction)
action = self._session_lifecycle.step(
anchor_world_matrix_fn=self._anchor_manager.get_world_matrix,
target_T_world=target_T_world,
)
if action is not None:
# Poll controller buttons (e.g. toggle anchor rotation on right 'A' press)
self._poll_buttons()
return action
# ------------------------------------------------------------------
# Target frame transform (config-driven rebase)
# ------------------------------------------------------------------
def _get_target_frame_T_world(self) -> np.ndarray | None:
"""Read the target-frame prim's world matrix from Fabric and return its inverse.
Uses USDRT to read the prim's hierarchical world matrix, matching the
pattern used by :class:`XrAnchorSynchronizer` for anchor prim reads.
Returns:
A (4, 4) float32 :class:`numpy.ndarray` representing the inverse
of the prim's world transform (i.e. ``target_T_world``), or
``None`` if the prim cannot be read.
"""
try:
import omni.usd
import usdrt
from pxr import UsdUtils
from usdrt import Rt
stage = omni.usd.get_context().get_stage()
stage_cache = UsdUtils.StageCache.Get()
stage_id = stage_cache.GetId(stage).ToLongInt()
if stage_id < 0:
stage_id = stage_cache.Insert(stage).ToLongInt()
rt_stage = usdrt.Usd.Stage.Attach(stage_id)
if rt_stage is None:
return None
rt_prim = rt_stage.GetPrimAtPath(self._cfg.target_frame_prim_path)
if not rt_prim.IsValid():
return None
rt_xformable = Rt.Xformable(rt_prim)
if not rt_xformable.GetPrim().IsValid():
return None
world_matrix_attr = rt_xformable.GetFabricHierarchyWorldMatrixAttr()
if world_matrix_attr is None:
return None
rt_matrix = world_matrix_attr.Get()
if rt_matrix is None:
return None
pos = rt_matrix.ExtractTranslation()
rt_quat = rt_matrix.ExtractRotationQuat()
from scipy.spatial.transform import Rotation
quat_xyzw = [
float(rt_quat.GetImaginary()[0]),
float(rt_quat.GetImaginary()[1]),
float(rt_quat.GetImaginary()[2]),
float(rt_quat.GetReal()),
]
R = Rotation.from_quat(quat_xyzw).as_matrix().astype(np.float32)
t = np.array([float(pos[0]), float(pos[1]), float(pos[2])], dtype=np.float32)
inv_mat = np.eye(4, dtype=np.float32)
inv_mat[:3, :3] = R.T
inv_mat[:3, 3] = -(R.T @ t)
return inv_mat
except Exception as e:
logger.warning(f"Failed to read target frame prim '{self._cfg.target_frame_prim_path}': {e}")
return None
# ------------------------------------------------------------------
# Controller button polling (glue between session and anchor manager)
# ------------------------------------------------------------------
def _poll_buttons(self) -> None:
"""Poll controller buttons and trigger actions on rising edges.
Called once per :meth:`advance` frame, after ``session.step()`` has
executed the pipeline so the controller ``TensorGroup`` is fresh.
Currently handles:
* Right controller primary button ("A") -- toggles anchor rotation.
"""
from isaacteleop.retargeting_engine.tensor_types import ControllerInputIndex
right_data = self._session_lifecycle.last_right_controller
if right_data is None or right_data.is_none:
return
current = float(right_data[ControllerInputIndex.PRIMARY_CLICK]) > 0.5
if current and not self._prev_right_a_pressed:
self._anchor_manager.toggle_anchor_rotation()
self._prev_right_a_pressed = current
def _enable_teleop_bridge() -> None:
"""Enable the XR teleop bridge extension and configure carb settings.
Must be called after the Omniverse AppLauncher has started.
"""
import carb.settings
import omni.kit.app
carb.settings.get_settings().set("/persistent/xr/openxr/disableInputBindings", True)
carb.settings.get_settings().set('/xr/openxr/components/"isaacsim.kit.xr.teleop.bridge"/enabled', True)
ext_manager = omni.kit.app.get_app().get_extension_manager()
ext_manager.set_extension_enabled_immediate("isaacsim.kit.xr.teleop.bridge", True)
[docs]
def create_isaac_teleop_device(
cfg: IsaacTeleopCfg,
sim_device: str | None = None,
callbacks: dict[str, Callable] | None = None,
) -> IsaacTeleopDevice:
"""Create an :class:`IsaacTeleopDevice` with required Omniverse extension setup.
This helper centralises the boilerplate that every script must execute
before constructing an :class:`IsaacTeleopDevice`:
1. Disable default OpenXR input bindings (prevents conflicts).
2. Enable the ``isaacsim.kit.xr.teleop.bridge`` extension.
3. Optionally override :attr:`IsaacTeleopCfg.sim_device` so action tensors
land on the same device the caller uses for the simulation.
Note:
When *sim_device* is provided, ``cfg.sim_device`` is mutated in place
before the device is constructed.
Args:
cfg: IsaacTeleop configuration.
sim_device: If provided, overrides ``cfg.sim_device`` so action tensors
are placed on the requested torch device (e.g. ``"cuda:0"``).
callbacks: Optional mapping of command keys (e.g. ``"START"``, ``"STOP"``,
``"RESET"``) to callables registered on the device.
Returns:
A fully configured :class:`IsaacTeleopDevice` ready for use in a
``with`` block.
"""
_enable_teleop_bridge()
if sim_device is not None:
cfg.sim_device = sim_device
logger.info("Using IsaacTeleop stack for teleoperation")
device = IsaacTeleopDevice(cfg)
if callbacks is not None:
for key, func in callbacks.items():
device.add_callback(key, func)
return device