Source code for isaaclab.utils.timer

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Sub-module for a timer class that can be used for performance measurements.

Note:
    This module has a hard dependency on `warp` because the :class:`Timer` calls
    ``wp.synchronize()`` on stop to flush pending GPU work before sampling the clock.
    Since IsaacLab workloads are predominantly GPU-bound, an unsynchronized timer would
    under-report wall time by returning before device kernels have finished executing.
"""

from __future__ import annotations

import math
import time
from contextlib import ContextDecorator
from typing import Any, ClassVar, Literal

import warp as wp


[docs] class TimerError(Exception): """A custom exception used to report errors in use of :class:`Timer` class.""" pass
[docs] class Timer(ContextDecorator): """A timer for performance measurements. A class to keep track of time for performance measurement. It allows timing via context managers and decorators as well. It uses the `time.perf_counter` function to measure time. This function returns the number of seconds since the epoch as a float. It has the highest resolution available on the system. As a regular object: .. code-block:: python import time from isaaclab.utils.timer import Timer timer = Timer() timer.start() time.sleep(1) print(1 <= timer.time_elapsed <= 2) # Output: True time.sleep(1) timer.stop() print(2 <= timer.total_run_time) # Output: True As a context manager: .. code-block:: python import time from isaaclab.utils.timer import Timer with Timer() as timer: time.sleep(1) print(1 <= timer.time_elapsed <= 2) # Output: True Reference: https://gist.github.com/sumeet/1123871 """ timing_info: ClassVar[dict[str, dict[str, float]]] = dict() """Dictionary for storing the elapsed time per timer instances globally. This dictionary logs the timer information. The keys are the names given to the timer class at its initialization. If no :attr:`name` is passed to the constructor, no time is recorded in the dictionary. """ _welford_state: ClassVar[dict[str, float]] = dict() """Internal accumulator (m2) for Welford's online algorithm, keyed by timer name.""" enable: ClassVar[bool] = True """Whether to enable the timer.""" enable_display_output: ClassVar[bool] = True """Whether to enable the display output.""" _UNIT_MULTIPLIERS: ClassVar[dict[str, float]] = {"s": 1.0, "ms": 1e3, "us": 1e6, "ns": 1e9} """Mapping from time unit string to multiplier (seconds -> unit)."""
[docs] def __init__( self, msg: str | None = None, name: str | None = None, enable: bool = True, time_unit: Literal["s", "ms", "us", "ns"] = "s", ): """Initializes the timer. Args: msg: The message to display when using the timer class in a context manager. Defaults to None. name: The name to use for logging times in a global dictionary. Defaults to None. enable: Whether to enable the timer. Defaults to True. time_unit: The unit to use for the elapsed time. Defaults to "s". """ self._msg = msg self._name = name self._start_time = None self._stop_time = None self._elapsed_time = None self._enable = enable if Timer.enable else False if time_unit not in Timer._UNIT_MULTIPLIERS: raise ValueError(f"Invalid time_unit, {time_unit} is not in {list(Timer._UNIT_MULTIPLIERS)}") self._format = time_unit self._multiplier = Timer._UNIT_MULTIPLIERS[time_unit]
def __str__(self) -> str: """A string representation of the class object. Returns: A string containing the elapsed time. """ return f"{(self.total_run_time * self._multiplier):0.6f} {self._format}" """ Properties """ @property def time_elapsed(self) -> float: """The number of seconds that have elapsed since this timer started timing. Note: This always returns seconds regardless of the configured ``time_unit``. It is used for checking how much time has elapsed while the timer is still running. """ if self._start_time is None: return 0.0 return time.perf_counter() - self._start_time @property def total_run_time(self) -> float: """The number of seconds that elapsed from when the timer started to when it ended. Note: This always returns seconds regardless of the configured ``time_unit``. """ if self._elapsed_time is None: return 0.0 return self._elapsed_time """ Operations """
[docs] def start(self): """Start timing.""" if not self._enable: return if self._start_time is not None: raise TimerError("Timer is running. Use .stop() to stop it") self._start_time = time.perf_counter()
[docs] def stop(self): """Stop timing.""" if not self._enable: return if self._start_time is None: raise TimerError("Timer is not running. Use .start() to start it") # Synchronize the device to make sure we time the whole operation wp.synchronize() # Get the elapsed time self._stop_time = time.perf_counter() self._elapsed_time = self._stop_time - self._start_time self._start_time = None if self._name is not None: self._update_welford(self._elapsed_time)
def _update_welford(self, value: float): """Update the running statistics using Welford's online algorithm.""" info = Timer.timing_info.get(self._name) if info is None: Timer.timing_info[self._name] = {"mean": value, "std": 0.0, "n": 1, "last": value} Timer._welford_state[self._name] = 0.0 else: m2 = Timer._welford_state[self._name] n = info["n"] + 1 delta = value - info["mean"] mean = info["mean"] + delta / n delta2 = value - mean m2 = m2 + delta * delta2 std = math.sqrt(m2 / (n - 1)) if n > 1 else 0.0 Timer.timing_info[self._name] = {"mean": mean, "std": std, "n": n, "last": value} Timer._welford_state[self._name] = m2 """ Context managers """ def __enter__(self) -> Timer: """Start timing and return this `Timer` instance.""" self.start() return self def __exit__(self, *exc_info: Any): """Stop timing.""" self.stop() # print message if self._enable: if (self._msg is not None) and (Timer.enable_display_output): parts = [f"Last: {(self._elapsed_time * self._multiplier):0.6f} {self._format}"] if self._name is not None: info = Timer.timing_info[self._name] parts.append(f"Mean: {(info['mean'] * self._multiplier):0.6f} {self._format}") parts.append(f"Std: {(info['std'] * self._multiplier):0.6f} {self._format}") parts.append(f"N: {info['n']}") print(self._msg, ", ".join(parts)) """ Static Methods """
[docs] @staticmethod def reset(name: str | None = None): """Reset statistics for a named timer, or all timers if name is None. Args: name: Name of the timer to reset. If None, resets all timers. """ if name is None: Timer.timing_info.clear() Timer._welford_state.clear() else: Timer.timing_info.pop(name, None) Timer._welford_state.pop(name, None)
[docs] @staticmethod def get_timer_info(name: str) -> float: """Retrieves the time logged in the global dictionary based on name. Args: name: Name of the the entry to be retrieved. Raises: TimerError: If name doesn't exist in the log. Returns: A float containing the time logged if the name exists. """ if name not in Timer.timing_info: raise TimerError(f"Timer {name} does not exist") return Timer.timing_info.get(name)["last"]
[docs] @staticmethod def get_timer_statistics(name: str) -> dict[str, float]: """Retrieves the timer statistics logged in the global dictionary based on name. Args: name: Name of the entry to be retrieved. Raises: TimerError: If name doesn't exist in the log. Returns: A dictionary containing the mean, std, and n for the named timer. """ if name not in Timer.timing_info: raise TimerError(f"Timer {name} does not exist") keys = ["mean", "std", "n", "last"] return {k: Timer.timing_info[name][k] for k in keys}