Source code for isaaclab.devices.openxr.openxr_device

# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""OpenXR-powered device for teleoperation and interaction."""
from __future__ import annotations

import contextlib
import logging
import numpy as np
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any

import carb

# import logger
logger = logging.getLogger(__name__)

from isaaclab.devices.openxr.common import HAND_JOINT_NAMES
from isaaclab.devices.retargeter_base import RetargeterBase

from ..device_base import DeviceBase, DeviceCfg
from .xr_anchor_utils import XrAnchorSynchronizer
from .xr_cfg import XrCfg

# For testing purposes, we need to mock the XRCore, XRPoseValidityFlags classes
XRCore = None
XRPoseValidityFlags = None
XRCoreEventType = None

with contextlib.suppress(ModuleNotFoundError):
    from omni.kit.xr.core import XRCore, XRPoseValidityFlags, XRCoreEventType

from isaacsim.core.prims import SingleXFormPrim


[docs]class OpenXRDevice(DeviceBase): """An OpenXR-powered device for teleoperation and interaction. This device tracks hand joints using OpenXR and makes them available as: 1. A dictionary of tracking data (when used without retargeters) 2. Retargeted commands for robot control (when retargeters are provided) Raw data format (_get_raw_data output): * A dictionary with keys matching TrackingTarget enum values (HAND_LEFT, HAND_RIGHT, HEAD) * Each hand tracking entry contains a dictionary of joint poses * Each joint pose is a 7D vector (x, y, z, qw, qx, qy, qz) in meters and quaternion units * Joint names are defined in HAND_JOINT_NAMES from isaaclab.devices.openxr.common * Supported joints include palm, wrist, and joints for thumb, index, middle, ring and little fingers Teleop commands: The device responds to several teleop commands that can be subscribed to via add_callback(): * "START": Resume hand tracking data flow * "STOP": Pause hand tracking data flow * "RESET": Reset the tracking and signal simulation reset The device tracks the left hand, right hand, head position, or any combination of these based on the TrackingTarget enum values. When retargeters are provided, the raw tracking data is transformed into robot control commands suitable for teleoperation. """ TELEOP_COMMAND_EVENT_TYPE = "teleop_command"
[docs] def __init__( self, cfg: OpenXRDeviceCfg, retargeters: list[RetargeterBase] | None = None, ): """Initialize the OpenXR device. Args: cfg: Configuration object for OpenXR settings. retargeters: List of retargeter instances to use for transforming raw tracking data. """ super().__init__(retargeters) self._xr_cfg = cfg.xr_cfg or XrCfg() self._additional_callbacks = dict() self._xr_core = XRCore.get_singleton() if XRCore is not None else None self._vc_subscription = ( self._xr_core.get_message_bus().create_subscription_to_pop_by_type( carb.events.type_from_string(self.TELEOP_COMMAND_EVENT_TYPE), self._on_teleop_command ) if self._xr_core is not None else None ) # Initialize dictionaries instead of arrays default_pose = np.array([0, 0, 0, 1, 0, 0, 0], dtype=np.float32) self._previous_joint_poses_left = {name: default_pose.copy() for name in HAND_JOINT_NAMES} self._previous_joint_poses_right = {name: default_pose.copy() for name in HAND_JOINT_NAMES} self._previous_headpose = default_pose.copy() if self._xr_cfg.anchor_prim_path is not None: anchor_path = self._xr_cfg.anchor_prim_path if anchor_path.endswith("/"): anchor_path = anchor_path[:-1] self._xr_anchor_headset_path = f"{anchor_path}/XRAnchor" else: self._xr_anchor_headset_path = "/World/XRAnchor" _ = SingleXFormPrim( self._xr_anchor_headset_path, position=self._xr_cfg.anchor_pos, orientation=self._xr_cfg.anchor_rot ) if hasattr(carb, "settings"): carb.settings.get_settings().set_float( "/persistent/xr/profile/ar/render/nearPlane", self._xr_cfg.near_plane ) carb.settings.get_settings().set_string("/persistent/xr/profile/ar/anchorMode", "custom anchor") carb.settings.get_settings().set_string("/xrstage/profile/ar/customAnchor", self._xr_anchor_headset_path) # Button binding support self.__button_subscriptions: dict[str, dict] = {} # Optional anchor synchronizer self._anchor_sync: XrAnchorSynchronizer | None = None if self._xr_core is not None and self._xr_cfg.anchor_prim_path is not None: try: self._anchor_sync = XrAnchorSynchronizer( xr_core=self._xr_core, xr_cfg=self._xr_cfg, xr_anchor_headset_path=self._xr_anchor_headset_path, ) # Subscribe to pre_sync_update to keep anchor in sync if XRCoreEventType is not None: self._xr_pre_sync_update_subscription = ( self._xr_core.get_message_bus().create_subscription_to_pop_by_type( XRCoreEventType.pre_sync_update, lambda _: self._anchor_sync.sync_headset_to_anchor(), name="isaaclab_xr_pre_sync_update", ) ) except Exception as e: logger.warning(f"XR: Failed to initialize anchor synchronizer: {e}") # Default convenience binding: toggle anchor rotation with right controller 'a' button with contextlib.suppress(Exception): self._bind_button_press( "/user/hand/right", "a", "isaaclab_right_a", lambda ev: self._toggle_anchor_rotation(), )
def __del__(self): """Clean up resources when the object is destroyed. Properly unsubscribes from the XR message bus to prevent memory leaks and resource issues when the device is no longer needed. """ if hasattr(self, "_vc_subscription") and self._vc_subscription is not None: self._vc_subscription = None if hasattr(self, "_xr_pre_sync_update_subscription") and self._xr_pre_sync_update_subscription is not None: self._xr_pre_sync_update_subscription = None # clear button subscriptions if hasattr(self, "__button_subscriptions"): self._unbind_all_buttons() # No need to explicitly clean up OpenXR instance as it's managed by NVIDIA Isaac Sim def __str__(self) -> str: """Returns a string containing information about the OpenXR hand tracking device. This provides details about the device configuration, tracking settings, and available gesture commands. Returns: Formatted string with device information """ msg = f"OpenXR Hand Tracking Device: {self.__class__.__name__}\n" msg += f"\tAnchor Position: {self._xr_cfg.anchor_pos}\n" msg += f"\tAnchor Rotation: {self._xr_cfg.anchor_rot}\n" if self._xr_cfg.anchor_prim_path is not None: msg += f"\tAnchor Prim Path: {self._xr_cfg.anchor_prim_path} (Dynamic Anchoring)\n" else: msg += "\tAnchor Mode: Static (Root Level)\n" # Add retargeter information if self._retargeters: msg += "\tRetargeters:\n" for i, retargeter in enumerate(self._retargeters): msg += f"\t\t{i + 1}. {retargeter.__class__.__name__}\n" else: msg += "\tRetargeters: None (raw joint data output)\n" # Add available gesture commands msg += "\t----------------------------------------------\n" msg += "\tAvailable Gesture Commands:\n" # Check which callbacks are registered start_avail = "START" in self._additional_callbacks stop_avail = "STOP" in self._additional_callbacks reset_avail = "RESET" in self._additional_callbacks msg += f"\t\tStart Teleoperation: {'✓' if start_avail else '✗'}\n" msg += f"\t\tStop Teleoperation: {'✓' if stop_avail else '✗'}\n" msg += f"\t\tReset Environment: {'✓' if reset_avail else '✗'}\n" # Add joint tracking information msg += "\t----------------------------------------------\n" msg += "\tTracked Joints: All 26 XR hand joints including:\n" msg += "\t\t- Wrist, palm\n" msg += "\t\t- Thumb (tip, intermediate joints)\n" msg += "\t\t- Fingers (tip, distal, intermediate, proximal)\n" return msg """ Operations """
[docs] def reset(self): default_pose = np.array([0, 0, 0, 1, 0, 0, 0], dtype=np.float32) self._previous_joint_poses_left = {name: default_pose.copy() for name in HAND_JOINT_NAMES} self._previous_joint_poses_right = {name: default_pose.copy() for name in HAND_JOINT_NAMES} self._previous_headpose = default_pose.copy() if hasattr(self, "_anchor_sync") and self._anchor_sync is not None: self._anchor_sync.reset()
[docs] def add_callback(self, key: str, func: Callable): """Add additional functions to bind to client messages. Args: key: The message type to bind to. Valid values are "START", "STOP", and "RESET". func: The function to call when the message is received. The callback function should not take any arguments. """ self._additional_callbacks[key] = func
def _get_raw_data(self) -> Any: """Get the latest tracking data from the OpenXR runtime. Returns: Dictionary with TrackingTarget enum keys (HAND_LEFT, HAND_RIGHT, HEAD) containing: - Left hand joint poses: Dictionary of 26 joints with position and orientation - Right hand joint poses: Dictionary of 26 joints with position and orientation - Head pose: Single 7-element array with position and orientation Each pose is represented as a 7-element array: [x, y, z, qw, qx, qy, qz] where the first 3 elements are position and the last 4 are quaternion orientation. """ data = {} if RetargeterBase.Requirement.HAND_TRACKING in self._required_features: data[DeviceBase.TrackingTarget.HAND_LEFT] = self._calculate_joint_poses( XRCore.get_singleton().get_input_device("/user/hand/left"), self._previous_joint_poses_left, ) data[DeviceBase.TrackingTarget.HAND_RIGHT] = self._calculate_joint_poses( XRCore.get_singleton().get_input_device("/user/hand/right"), self._previous_joint_poses_right, ) if RetargeterBase.Requirement.HEAD_TRACKING in self._required_features: data[DeviceBase.TrackingTarget.HEAD] = self._calculate_headpose() if RetargeterBase.Requirement.MOTION_CONTROLLER in self._required_features: # Optionally include motion controller pose+inputs if devices are available try: left_dev = XRCore.get_singleton().get_input_device("/user/hand/left") right_dev = XRCore.get_singleton().get_input_device("/user/hand/right") left_ctrl = self._query_controller(left_dev) if left_dev is not None else np.array([]) right_ctrl = self._query_controller(right_dev) if right_dev is not None else np.array([]) if left_ctrl.size: data[DeviceBase.TrackingTarget.CONTROLLER_LEFT] = left_ctrl if right_ctrl.size: data[DeviceBase.TrackingTarget.CONTROLLER_RIGHT] = right_ctrl except Exception: # Ignore controller data if XRCore/controller APIs are unavailable pass return data """ Internal helpers. """ def _calculate_joint_poses( self, hand_device: Any, previous_joint_poses: dict[str, np.ndarray] ) -> dict[str, np.ndarray]: """Calculate and update joint poses for a hand device. This function retrieves the current joint poses from the OpenXR hand device and updates the previous joint poses with the new data. If a joint's position or orientation is not valid, it will use the previous values. Args: hand_device: The OpenXR input device for a hand (/user/hand/left or /user/hand/right). previous_joint_poses: Dictionary mapping joint names to their previous poses. Each pose is a 7-element array: [x, y, z, qw, qx, qy, qz]. Returns: Updated dictionary of joint poses with the same structure as previous_joint_poses. Each pose is represented as a 7-element numpy array: [x, y, z, qw, qx, qy, qz] where the first 3 elements are position and the last 4 are quaternion orientation. """ if hand_device is None: return previous_joint_poses joint_poses = hand_device.get_all_virtual_world_poses() # Update each joint that is present in the current data for joint_name, joint_pose in joint_poses.items(): if joint_name in HAND_JOINT_NAMES: # Extract translation and rotation if joint_pose.validity_flags & XRPoseValidityFlags.POSITION_VALID: position = joint_pose.pose_matrix.ExtractTranslation() else: position = previous_joint_poses[joint_name][:3] if joint_pose.validity_flags & XRPoseValidityFlags.ORIENTATION_VALID: quat = joint_pose.pose_matrix.ExtractRotationQuat() quati = quat.GetImaginary() quatw = quat.GetReal() else: quatw = previous_joint_poses[joint_name][3] quati = previous_joint_poses[joint_name][4:] # Directly update the dictionary with new data previous_joint_poses[joint_name] = np.array( [position[0], position[1], position[2], quatw, quati[0], quati[1], quati[2]], dtype=np.float32 ) # No need for conversion, just return the updated dictionary return previous_joint_poses def _calculate_headpose(self) -> np.ndarray: """Calculate the head pose from OpenXR. Returns: numpy.ndarray: 7-element array containing head position (xyz) and orientation (wxyz) """ head_device = XRCore.get_singleton().get_input_device("/user/head") if head_device: hmd = head_device.get_virtual_world_pose("") position = hmd.ExtractTranslation() quat = hmd.ExtractRotationQuat() quati = quat.GetImaginary() quatw = quat.GetReal() # Store in w, x, y, z order to match our convention self._previous_headpose = np.array([ position[0], position[1], position[2], quatw, quati[0], quati[1], quati[2], ]) return self._previous_headpose # ----------------------------- # Controller button binding utilities # ----------------------------- def _bind_button_press( self, device_path: str, button_name: str, event_name: str, on_button_press: Callable[[carb.events.IEvent], None], ) -> None: if self._xr_core is None: logger.warning("XR core not available; skipping button binding") return sub_key = f"{device_path}/{button_name}" self.__button_subscriptions[sub_key] = {} def try_emit_button_events(): if self.__button_subscriptions[sub_key].get("generator"): return device = self._xr_core.get_input_device(device_path) if not device: return names = {str(n) for n in (device.get_input_names() or ())} if button_name not in names: return gen = device.bind_event_generator(button_name, event_name, ("press",)) if gen is not None: logger.info(f"XR: Bound event generator for {sub_key}, {event_name}") self.__button_subscriptions[sub_key]["generator"] = gen def on_inputs_change(_ev: carb.events.IEvent) -> None: try_emit_button_events() def on_disable(_ev: carb.events.IEvent) -> None: self.__button_subscriptions[sub_key]["generator"] = None message_bus = self._xr_core.get_message_bus() event_suffix = device_path.strip("/").replace("/", "_") self.__button_subscriptions[sub_key]["press_sub"] = message_bus.create_subscription_to_pop_by_type( carb.events.type_from_string(f"{event_name}.press"), on_button_press ) self.__button_subscriptions[sub_key]["inputs_change_sub"] = message_bus.create_subscription_to_pop_by_type( carb.events.type_from_string(f"xr_input.{event_suffix}.inputs_change"), on_inputs_change ) self.__button_subscriptions[sub_key]["disable_sub"] = message_bus.create_subscription_to_pop_by_type( carb.events.type_from_string(f"xr_input.{event_suffix}.disable"), on_disable ) try_emit_button_events() def _unbind_all_buttons(self) -> None: for sub_key, subs in self.__button_subscriptions.items(): if "generator" in subs: subs["generator"] = None for key in ["press_sub", "inputs_change_sub", "disable_sub"]: if key in subs: subs[key] = None self.__button_subscriptions.clear() logger.info("XR: Unbound all button event handlers") def _toggle_anchor_rotation(self): if self._anchor_sync is not None: self._anchor_sync.toggle_anchor_rotation() def _query_controller(self, input_device) -> np.ndarray: """Query motion controller pose and inputs as a 2x7 array. Row 0 (POSE): [x, y, z, w, x, y, z] Row 1 (INPUTS): [thumbstick_x, thumbstick_y, trigger, squeeze, button_0, button_1, padding] """ if input_device is None: return np.array([]) pose = input_device.get_virtual_world_pose() position = pose.ExtractTranslation() quat = pose.ExtractRotationQuat() thumbstick_x = 0.0 thumbstick_y = 0.0 trigger = 0.0 squeeze = 0.0 button_0 = 0.0 button_1 = 0.0 if input_device.has_input_gesture("thumbstick", "x"): thumbstick_x = float(input_device.get_input_gesture_value("thumbstick", "x")) if input_device.has_input_gesture("thumbstick", "y"): thumbstick_y = float(input_device.get_input_gesture_value("thumbstick", "y")) if input_device.has_input_gesture("trigger", "value"): trigger = float(input_device.get_input_gesture_value("trigger", "value")) if input_device.has_input_gesture("squeeze", "value"): squeeze = float(input_device.get_input_gesture_value("squeeze", "value")) # Determine which button pair exists on this device if input_device.has_input_gesture("x", "click") or input_device.has_input_gesture("y", "click"): if input_device.has_input_gesture("x", "click"): button_0 = float(input_device.get_input_gesture_value("x", "click")) if input_device.has_input_gesture("y", "click"): button_1 = float(input_device.get_input_gesture_value("y", "click")) else: if input_device.has_input_gesture("a", "click"): button_0 = float(input_device.get_input_gesture_value("a", "click")) if input_device.has_input_gesture("b", "click"): button_1 = float(input_device.get_input_gesture_value("b", "click")) pose_row = [ position[0], position[1], position[2], quat.GetReal(), quat.GetImaginary()[0], quat.GetImaginary()[1], quat.GetImaginary()[2], ] input_row = [ thumbstick_x, thumbstick_y, trigger, squeeze, button_0, button_1, 0.0, ] return np.array([pose_row, input_row], dtype=np.float32) def _on_teleop_command(self, event: carb.events.IEvent): msg = event.payload["message"] if "start" in msg: if "START" in self._additional_callbacks: self._additional_callbacks["START"]() elif "stop" in msg: if "STOP" in self._additional_callbacks: self._additional_callbacks["STOP"]() elif "reset" in msg: if "RESET" in self._additional_callbacks: self._additional_callbacks["RESET"]() self.reset()
@dataclass class OpenXRDeviceCfg(DeviceCfg): """Configuration for OpenXR devices.""" xr_cfg: XrCfg | None = None class_type: type[DeviceBase] = OpenXRDevice