Ray Job Dispatch and Tuning#

Isaac Lab supports Ray for streamlining dispatching multiple training jobs (in parallel and in series), and hyperparameter tuning, both on local and remote configurations.

This independent community contributed walkthrough video demonstrates some of the core functionality of the Ray integration covered in this overview. Although there may be some differences in the codebase (such as file names being shortened) since the creation of the video, the general workflow is the same.

Attention

This functionality is experimental, and has been tested only on Linux.

Overview#

The Ray integration is useful for the following:

  • Dispatching several training jobs in parallel or sequentially with minimal interaction

  • Tuning hyperparameters; in parallel or sequentially with support for multiple GPUs and/or multiple GPU Nodes

  • Using the same training setup everywhere (on cloud and local) with minimal overhead

  • Resource Isolation for training jobs

The core functionality of the Ray workflow consists of two main scripts that enable the orchestration of resource-wrapped and tuning aggregate jobs. These scripts facilitate the decomposition of aggregate jobs (overarching experiments) into individual jobs, which are discrete commands executed on the cluster. An aggregate job can include multiple individual jobs. For clarity, this guide refers to the jobs one layer below the topmost aggregate level as sub-jobs.

Both resource-wrapped and tuning aggregate jobs dispatch individual jobs to a designated Ray cluster, which leverages the cluster’s resources (e.g., a single workstation node or multiple nodes) to execute these jobs with workers in parallel and/or sequentially. By default, aggregate jobs use all available resources on each available GPU-enabled node for each sub-job worker. This can be changed through specifying the --num_workers argument, especially critical for parallel aggregate job processing on local or virtual multi-GPU machines

In resource-wrapped aggregate jobs, each sub-job and its resource requirements are defined manually, enabling resource isolation. For tuning aggregate jobs, individual jobs are generated automatically based on a hyperparameter sweep configuration. This assumes homogeneous node resource composition for nodes with GPUs.

source/standalone/workflows/ray/wrap_resources.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse

import ray
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

import source.standalone.workflows.ray.util as util

"""
This script dispatches sub-job(s) (either individual jobs or tuning aggregate jobs)
to worker(s) on GPU-enabled node(s) of a specific cluster as part of an resource-wrapped aggregate
job. If no desired compute resources for each sub-job are specified,
this script creates one worker per available node for each node with GPU(s) in the cluster.
If the desired resources for each sub-job is specified,
the maximum number of workers possible with the desired resources are created for each node
with GPU(s) in the cluster. It is also possible to split available node resources for each node
into the desired number of workers with the ``--num_workers`` flag, to be able to easily
parallelize sub-jobs on multi-GPU nodes. Due to Isaac Lab requiring a GPU,
this ignores all CPU only nodes such as loggers.

Sub-jobs are matched with node(s) in a cluster via the following relation:
sorted_nodes = Node sorted by descending GPUs, then descending CPUs, then descending RAM, then node ID
node_submitted_to = sorted_nodes[job_index % total_node_count]

To check the ordering of sorted nodes, supply the ``--test`` argument and run the script.

Sub-jobs are separated by the + delimiter. The ``--sub_jobs`` argument must be the last
argument supplied to the script.

If there is more than one available worker, and more than one sub-job,
sub-jobs will be executed in parallel. If there are more sub-jobs than workers, sub-jobs will
be dispatched to workers as they become available. There is no limit on the number
of sub-jobs that can be near-simultaneously submitted.

This script is meant to be executed on a Ray cluster head node as an aggregate cluster job.
To submit aggregate cluster jobs such as this script to one or more remote clusters,
see :file:`../submit_isaac_ray_job.py`.

KubeRay clusters on Google GKE can be created with :file:`../launch.py`

Usage:

.. code-block:: bash
    # **Ensure that sub-jobs are separated by the ``+`` delimiter.**
    # Generic Templates-----------------------------------
    ./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py -h
    # No resource isolation; no parallelization:
    ./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py
    --sub_jobs <JOB0>+<JOB1>+<JOB2>
    # Automatic Resource Isolation; Example A: needed for parallelization
    ./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py \
    --num_workers <NUM_TO_DIVIDE_TOTAL_RESOURCES_BY> \
    --sub_jobs <JOB0>+<JOB1>
    # Manual Resource Isolation; Example B:  needed for parallelization
    ./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py --num_cpu_per_worker <CPU> \
    --gpu_per_worker <GPU> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1>
    # Manual Resource Isolation; Example C: Needed for parallelization, for heterogeneous workloads
    ./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py --num_cpu_per_worker <CPU> \
    --gpu_per_worker <GPU1> <GPU2> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1>
    # to see all arguments
    ./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py -h
"""


def wrap_resources_to_jobs(jobs: list[str], args: argparse.Namespace) -> None:
    """
    Provided a list of jobs, dispatch jobs to one worker per available node,
    unless otherwise specified by resource constraints.

    Args:
        jobs: bash commands to execute on a Ray cluster
        args: The arguments for resource allocation

    """
    if not ray.is_initialized():
        ray.init(address=args.ray_address, log_to_driver=True)
    job_results = []
    gpu_node_resources = util.get_gpu_node_resources(include_id=True, include_gb_ram=True)

    if any([args.gpu_per_worker, args.cpu_per_worker, args.ram_gb_per_worker]) and args.num_workers:
        raise ValueError("Either specify only num_workers or only granular resources(GPU,CPU,RAM_GB).")

    num_nodes = len(gpu_node_resources)
    # Populate arguments
    formatted_node_resources = {
        "gpu_per_worker": [gpu_node_resources[i]["GPU"] for i in range(num_nodes)],
        "cpu_per_worker": [gpu_node_resources[i]["CPU"] for i in range(num_nodes)],
        "ram_gb_per_worker": [gpu_node_resources[i]["ram_gb"] for i in range(num_nodes)],
        "num_workers": args.num_workers,  # By default, 1 worker por node
    }
    args = util.fill_in_missing_resources(args, resources=formatted_node_resources, policy=min)
    print(f"[INFO]: Number of GPU nodes found: {num_nodes}")
    if args.test:
        jobs = ["nvidia-smi"] * num_nodes
    for i, job in enumerate(jobs):
        gpu_node = gpu_node_resources[i % num_nodes]
        print(f"[INFO]: Submitting job {i + 1} of {len(jobs)} with job '{job}' to node {gpu_node}")
        print(
            f"[INFO]: Resource parameters: GPU: {args.gpu_per_worker[i]}"
            f" CPU: {args.cpu_per_worker[i]} RAM {args.ram_gb_per_worker[i]}"
        )
        print(f"[INFO] For the node parameters, creating {args.num_workers[i]} workers")
        num_gpus = args.gpu_per_worker[i] / args.num_workers[i]
        num_cpus = args.cpu_per_worker[i] / args.num_workers[i]
        memory = (args.ram_gb_per_worker[i] * 1024**3) / args.num_workers[i]
        print(f"[INFO]: Requesting {num_gpus=} {num_cpus=} {memory=} id={gpu_node['id']}")
        job = util.remote_execute_job.options(
            num_gpus=num_gpus,
            num_cpus=num_cpus,
            memory=memory,
            scheduling_strategy=NodeAffinitySchedulingStrategy(gpu_node["id"], soft=False),
        ).remote(job, f"Job {i}", args.test)
        job_results.append(job)

    results = ray.get(job_results)
    for i, result in enumerate(results):
        print(f"[INFO]: Job {i} result: {result}")
    print("[INFO]: All jobs completed.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Submit multiple jobs with optional GPU testing.")
    parser = util.add_resource_arguments(arg_parser=parser)
    parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.")
    parser.add_argument(
        "--test",
        action="store_true",
        help=(
            "Run nvidia-smi test instead of the arbitrary job,"
            "can use as a sanity check prior to any jobs to check "
            "that GPU resources are correctly isolated."
        ),
    )
    parser.add_argument(
        "--sub_jobs",
        type=str,
        nargs=argparse.REMAINDER,
        help="This should be last wrapper argument. Jobs separated by the + delimiter to run on a cluster.",
    )
    args = parser.parse_args()
    if args.sub_jobs is not None:
        jobs = " ".join(args.sub_jobs)
        formatted_jobs = jobs.split("+")
    else:
        formatted_jobs = []
    print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}")
    wrap_resources_to_jobs(jobs=formatted_jobs, args=args)
source/standalone/workflows/ray/tuner.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
import importlib.util
import os
import sys
from time import sleep

import ray
import util
from ray import air, tune
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.repeater import Repeater

"""
This script breaks down an aggregate tuning job, as defined by a hyperparameter sweep configuration,
into individual jobs (shell commands) to run on the GPU-enabled nodes of the cluster.
By default, (unless combined as a sub-job in a resource-wrapped aggregate job), one worker is created
for each GPU-enabled node in the cluster for each individual job.

Each hyperparameter sweep configuration should include the workflow,
runner arguments, and hydra arguments to vary.

This assumes that all workers in a cluster are homogeneous. For heterogeneous workloads,
create several heterogeneous clusters (with homogeneous nodes in each cluster),
then submit several overall-cluster jobs with :file:`../submit_job.py`.
KubeRay clusters on Google GKE can be created with :file:`../launch.py`

To report tune metrics on clusters, a running MLFlow server with a known URI that the cluster has
access to is required. For KubeRay clusters configured with :file:`../launch.py`, this is included
automatically, and can be easily found with with :file:`grok_cluster_with_kubectl.py`

Usage:

.. code-block:: bash

    ./isaaclab.sh -p source/standalone/workflows/ray/tuner.py -h

    # Examples
    # Local (not within a docker container, when within a local docker container, do not supply run_mode argument)
    ./isaaclab.sh -p source/standalone/workflows/ray/tuner.py --run_mode local \
    --cfg_file source/standalone/workflows/ray/hyperparameter_tuning/vision_cartpole_cfg.py \
    --cfg_class CartpoleRGBNoTuneJobCfg
    # Local docker: start the ray server and run above command in the same running container without run_mode arg
    # Remote (run grok cluster or create config file mentioned in :file:`submit_job.py`)
    ./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py \
    --aggregate_jobs tuner.py \
    --cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
    --cfg_class CartpoleRGBNoTuneJobCfg --mlflow_uri <MLFLOW_URI_FROM_GROK_OR_MANUAL>

"""

DOCKER_PREFIX = "/workspace/isaaclab/"
BASE_DIR = os.path.expanduser("~")
PYTHON_EXEC = "./isaaclab.sh -p"
WORKFLOW = "source/standalone/workflows/rl_games/train.py"
NUM_WORKERS_PER_NODE = 1  # needed for local parallelism


class IsaacLabTuneTrainable(tune.Trainable):
    """The Isaac Lab Ray Tune Trainable.
    This class uses the standalone workflows to start jobs, along with the hydra integration.
    This class achieves Ray-based logging through reading the tensorboard logs from
    the standalone workflows. This depends on a config generated in the format of
    :class:`JobCfg`
    """

    def setup(self, config: dict) -> None:
        """Get the invocation command, return quick for easy scheduling."""
        self.data = None
        self.invoke_cmd = util.get_invocation_command_from_cfg(cfg=config, python_cmd=PYTHON_EXEC, workflow=WORKFLOW)
        print(f"[INFO]: Recovered invocation with {self.invoke_cmd}")
        self.experiment = None

    def reset_config(self, new_config):
        """Allow environments to be re-used by fetching a new invocation command"""
        self.setup(new_config)
        return True

    def step(self) -> dict:
        if self.experiment is None:  # start experiment
            # When including this as first step instead of setup, experiments get scheduled faster
            # Don't want to block the scheduler while the experiment spins up
            print(f"[INFO]: Invoking experiment as first step with {self.invoke_cmd}...")
            experiment = util.execute_job(
                self.invoke_cmd,
                identifier_string="",
                extract_experiment=True,
                persistent_dir=BASE_DIR,
            )
            self.experiment = experiment
            print(f"[INFO]: Tuner recovered experiment info {experiment}")
            self.proc = experiment["proc"]
            self.experiment_name = experiment["experiment_name"]
            self.isaac_logdir = experiment["logdir"]
            self.tensorboard_logdir = self.isaac_logdir + f"/{self.experiment_name}/summaries"
            self.done = False

        if self.proc is None:
            raise ValueError("Could not start trial.")

        if self.proc.poll() is not None:  # process finished, signal finish
            self.data["done"] = True
            print("[INFO]: Process finished, returning...")
        else:  # wait until the logs are ready or fresh
            data = util.load_tensorboard_logs(self.tensorboard_logdir)

            while data is None:
                data = util.load_tensorboard_logs(self.tensorboard_logdir)
                sleep(2)  # Lazy report metrics to avoid performance overhead

            if self.data is not None:
                while util._dicts_equal(data, self.data):
                    data = util.load_tensorboard_logs(self.tensorboard_logdir)
                    sleep(2)  # Lazy report metrics to avoid performance overhead

            self.data = data
            self.data["done"] = False
        return self.data

    def default_resource_request(self):
        """How many resources each trainable uses. Assumes homogeneous resources across gpu nodes,
        and that each trainable is meant for one node, where it uses all available resources."""
        resources = util.get_gpu_node_resources(one_node_only=True)
        if NUM_WORKERS_PER_NODE != 1:
            print("[WARNING]: Splitting node into more than one worker")
        return tune.PlacementGroupFactory(
            [{"CPU": resources["CPU"] / NUM_WORKERS_PER_NODE, "GPU": resources["GPU"] / NUM_WORKERS_PER_NODE}],
            strategy="STRICT_PACK",
        )


def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None:
    """Invoke an Isaac-Ray tuning run.

    Log either to a local directory or to MLFlow.
    Args:
        cfg: Configuration dictionary extracted from job setup
        args: Command-line arguments related to tuning.
    """
    # Allow for early exit
    os.environ["TUNE_DISABLE_STRICT_METRIC_CHECKING"] = "1"

    print("[WARNING]: Not saving checkpoints, just running experiment...")
    print("[INFO]: Model parameters and metrics will be preserved.")
    print("[WARNING]: For homogeneous cluster resources only...")
    # Get available resources
    resources = util.get_gpu_node_resources()
    print(f"[INFO]: Available resources {resources}")

    if not ray.is_initialized():
        ray.init(
            address=args.ray_address,
            log_to_driver=True,
            num_gpus=len(resources),
        )

    print(f"[INFO]: Using config {cfg}")

    # Configure the search algorithm and the repeater
    searcher = OptunaSearch(
        metric=args.metric,
        mode=args.mode,
    )
    repeat_search = Repeater(searcher, repeat=args.repeat_run_count)

    if args.run_mode == "local":  # Standard config, to file
        run_config = air.RunConfig(
            storage_path="/tmp/ray",
            name=f"IsaacRay-{args.cfg_class}-tune",
            verbose=1,
            checkpoint_config=air.CheckpointConfig(
                checkpoint_frequency=0,  # Disable periodic checkpointing
                checkpoint_at_end=False,  # Disable final checkpoint
            ),
        )

    elif args.run_mode == "remote":  # MLFlow, to MLFlow server
        mlflow_callback = MLflowLoggerCallback(
            tracking_uri=args.mlflow_uri,
            experiment_name=f"IsaacRay-{args.cfg_class}-tune",
            save_artifact=False,
            tags={"run_mode": "remote", "cfg_class": args.cfg_class},
        )

        run_config = ray.train.RunConfig(
            name="mlflow",
            storage_path="/tmp/ray",
            callbacks=[mlflow_callback],
            checkpoint_config=ray.train.CheckpointConfig(checkpoint_frequency=0, checkpoint_at_end=False),
        )
    else:
        raise ValueError("Unrecognized run mode.")

    # Configure the tuning job
    tuner = tune.Tuner(
        IsaacLabTuneTrainable,
        param_space=cfg,
        tune_config=tune.TuneConfig(
            search_alg=repeat_search,
            num_samples=args.num_samples,
            reuse_actors=True,
        ),
        run_config=run_config,
    )

    # Execute the tuning
    tuner.fit()

    # Save results to mounted volume
    if args.run_mode == "local":
        print("[DONE!]: Check results with tensorboard dashboard")
    else:
        print("[DONE!]: Check results with MLFlow dashboard")


class JobCfg:
    """To be compatible with :meth: invoke_tuning_run and :class:IsaacLabTuneTrainable,
    at a minimum, the tune job should inherit from this class."""

    def __init__(self, cfg):
        assert "runner_args" in cfg, "No runner arguments specified."
        assert "--task" in cfg["runner_args"], "No task specified."
        assert "hydra_args" in cfg, "No hypeparameters specified."
        self.cfg = cfg


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Tune Isaac Lab hyperparameters.")
    parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.")
    parser.add_argument(
        "--cfg_file",
        type=str,
        default="hyperparameter_tuning/vision_cartpole_cfg.py",
        required=False,
        help="The relative filepath where a hyperparameter sweep is defined",
    )
    parser.add_argument(
        "--cfg_class",
        type=str,
        default="CartpoleRGBNoTuneJobCfg",
        required=False,
        help="Name of the hyperparameter sweep class to use",
    )
    parser.add_argument(
        "--run_mode",
        choices=["local", "remote"],
        default="remote",
        help=(
            "Set to local to use ./isaaclab.sh -p python, set to "
            "remote to use /workspace/isaaclab/isaaclab.sh -p python"
        ),
    )
    parser.add_argument(
        "--workflow",
        default=None,  # populated with RL Games
        help="The absolute path of the workflow to use for the experiment. By default, RL Games is used.",
    )
    parser.add_argument(
        "--mlflow_uri",
        type=str,
        default=None,
        required=False,
        help="The MLFlow Uri.",
    )
    parser.add_argument(
        "--num_workers_per_node",
        type=int,
        default=1,
        help="Number of workers to run on each GPU node. Only supply for parallelism on multi-gpu nodes",
    )

    parser.add_argument("--metric", type=str, default="rewards/time", help="What metric to tune for.")

    parser.add_argument(
        "--mode",
        choices=["max", "min"],
        default="max",
        help="What to optimize the metric to while tuning",
    )
    parser.add_argument(
        "--num_samples",
        type=int,
        default=100,
        help="How many hyperparameter runs to try total.",
    )
    parser.add_argument(
        "--repeat_run_count",
        type=int,
        default=3,
        help="How many times to repeat each hyperparameter config.",
    )

    args = parser.parse_args()
    NUM_WORKERS_PER_NODE = args.num_workers_per_node
    print(f"[INFO]: Using {NUM_WORKERS_PER_NODE} workers per node.")
    if args.run_mode == "remote":
        BASE_DIR = DOCKER_PREFIX  # ensure logs are dumped to persistent location
        PYTHON_EXEC = DOCKER_PREFIX + PYTHON_EXEC[2:]
        if args.workflow is None:
            WORKFLOW = DOCKER_PREFIX + WORKFLOW
        else:
            WORKFLOW = args.workflow
        print(f"[INFO]: Using remote mode {PYTHON_EXEC=} {WORKFLOW=}")

        if args.mlflow_uri is not None:
            import mlflow

            mlflow.set_tracking_uri(args.mlflow_uri)
            from ray.air.integrations.mlflow import MLflowLoggerCallback
        else:
            raise ValueError("Please provide a result MLFLow URI server.")
    else:  # local
        PYTHON_EXEC = os.getcwd() + "/" + PYTHON_EXEC[2:]
        if args.workflow is None:
            WORKFLOW = os.getcwd() + "/" + WORKFLOW
        else:
            WORKFLOW = args.workflow
        BASE_DIR = os.getcwd()
        print(f"[INFO]: Using local mode {PYTHON_EXEC=} {WORKFLOW=}")
    file_path = args.cfg_file
    class_name = args.cfg_class
    print(f"[INFO]: Attempting to use sweep config from {file_path=} {class_name=}")
    module_name = os.path.splitext(os.path.basename(file_path))[0]

    spec = importlib.util.spec_from_file_location(module_name, file_path)
    module = importlib.util.module_from_spec(spec)
    sys.modules[module_name] = module
    spec.loader.exec_module(module)
    print(f"[INFO]: Successfully imported {module_name} from {file_path}")
    if hasattr(module, class_name):
        ClassToInstantiate = getattr(module, class_name)
        print(f"[INFO]: Found correct class {ClassToInstantiate}")
        instance = ClassToInstantiate()
        print(f"[INFO]: Successfully instantiated class '{class_name}' from {file_path}")
        cfg = instance.cfg
        print(f"[INFO]: Grabbed the following hyperparameter sweep config: \n {cfg}")
        invoke_tuning_run(cfg, args)

    else:
        raise AttributeError(f"[ERROR]:Class '{class_name}' not found in {file_path}")

The following script can be used to submit aggregate jobs to one or more Ray cluster(s), which can be used for running jobs on a remote cluster or simultaneous jobs with heterogeneous resource requirements:

source/standalone/workflows/ray/submit_job.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import os
import time
from concurrent.futures import ThreadPoolExecutor

from ray import job_submission

"""
This script submits aggregate job(s) to cluster(s) described in a
config file containing ``name: <NAME> address: http://<IP>:<PORT>`` on
a new line for each cluster. For KubeRay clusters, this file
can be automatically created with :file:`grok_cluster_with_kubectl.py`

Aggregate job(s) are matched with cluster(s) via the following relation:
cluster_line_index_submitted_to = job_index % total_cluster_count

Aggregate jobs are separated by the * delimiter. The ``--aggregate_jobs`` argument must be
the last argument supplied to the script.

An aggregate job could be a :file:`../tuner.py` tuning job, which automatically
creates several individual jobs when started on a cluster. Alternatively, an aggregate job
could be a :file:'../wrap_resources.py` resource-wrapped job,
which may contain several individual sub-jobs separated by
the + delimiter.

If there are more aggregate jobs than cluster(s), aggregate jobs will be submitted
as clusters become available via the defined relation above. If there are less aggregate job(s)
than clusters, some clusters will not receive aggregate job(s). The maximum number of
aggregate jobs that can be run simultaneously is equal to the number of workers created by
default by a ThreadPoolExecutor on the machine submitting jobs due to fetching the log output after
jobs finish, which is unlikely to constrain overall-job submission.

Usage:

.. code-block:: bash

    # Example; submitting a tuning job
    ./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py \
    --aggregate_jobs /workspace/isaaclab/source/standalone/workflows/ray/tuner.py \
        --cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
        --cfg_class CartpoleRGBNoTuneJobCfg --mlflow_uri <ML_FLOW_URI>

    # Example: Submitting resource wrapped job
    ./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py --aggregate_jobs wrap_resources.py --sub_jobs ./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task Isaac-Cartpole-v0 --headless+./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task Isaac-Cartpole-RGB-Camera-Direct-v0 --headless --enable_cameras agent.params.config.max_epochs=150

    # For all command line arguments
    ./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py -h
"""
script_directory = os.path.dirname(os.path.abspath(__file__))
CONFIG = {"working_dir": script_directory, "executable": "/workspace/isaaclab/isaaclab.sh -p"}


def read_cluster_spec(fn: str | None = None) -> list[dict]:
    if fn is None:
        cluster_spec_path = os.path.expanduser("~/.cluster_config")
    else:
        cluster_spec_path = os.path.expanduser(fn)

    if not os.path.exists(cluster_spec_path):
        raise FileNotFoundError(f"Cluster spec file not found at {cluster_spec_path}")

    clusters = []
    with open(cluster_spec_path) as f:
        for line in f:
            parts = line.strip().split(" ")
            http_address = parts[3]
            cluster_info = {"name": parts[1], "address": http_address}
            print(f"[INFO] Setting {cluster_info['name']}")  # with {cluster_info['num_gpu']} GPUs.")
            clusters.append(cluster_info)

    return clusters


def submit_job(cluster: dict, job_command: str) -> None:
    """
    Submits a job to a single cluster, prints the final result and Ray dashboard URL at the end.
    """
    address = cluster["address"]
    cluster_name = cluster["name"]
    print(f"[INFO]: Submitting job to cluster '{cluster_name}' at {address}")  # with {num_gpus} GPUs.")
    client = job_submission.JobSubmissionClient(address)
    runtime_env = {"working_dir": CONFIG["working_dir"], "executable": CONFIG["executable"]}
    print(f"[INFO]: Checking contents of the directory: {CONFIG['working_dir']}")
    try:
        dir_contents = os.listdir(CONFIG["working_dir"])
        print(f"[INFO]: Directory contents: {dir_contents}")
    except Exception as e:
        print(f"[INFO]: Failed to list directory contents: {str(e)}")
    entrypoint = f"{CONFIG['executable']} {job_command}"
    print(f"[INFO]: Attempting entrypoint {entrypoint=} in cluster {cluster}")
    job_id = client.submit_job(entrypoint=entrypoint, runtime_env=runtime_env)
    status = client.get_job_status(job_id)
    while status in [job_submission.JobStatus.PENDING, job_submission.JobStatus.RUNNING]:
        time.sleep(5)
        status = client.get_job_status(job_id)

    final_logs = client.get_job_logs(job_id)
    print("----------------------------------------------------")
    print(f"[INFO]: Cluster {cluster_name} Logs: \n")
    print(final_logs)
    print("----------------------------------------------------")


def submit_jobs_to_clusters(jobs: list[str], clusters: list[dict]) -> None:
    """
    Submit all jobs to their respective clusters, cycling through clusters if there are more jobs than clusters.
    """
    if not clusters:
        raise ValueError("No clusters available for job submission.")

    if len(jobs) < len(clusters):
        print("[INFO]: Less jobs than clusters, some clusters will not receive jobs")
    elif len(jobs) == len(clusters):
        print("[INFO]: Exactly one job per cluster")
    else:
        print("[INFO]: More jobs than clusters, jobs submitted as clusters become available.")
    with ThreadPoolExecutor() as executor:
        for idx, job_command in enumerate(jobs):
            # Cycle through clusters using modulus to wrap around if there are more jobs than clusters
            cluster = clusters[idx % len(clusters)]
            executor.submit(submit_job, cluster, job_command)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Submit multiple GPU jobs to multiple Ray clusters.")
    parser.add_argument("--config_file", default="~/.cluster_config", help="The cluster config path.")
    parser.add_argument(
        "--aggregate_jobs",
        type=str,
        nargs=argparse.REMAINDER,
        help="This should be last argument. The aggregate jobs to submit separated by the * delimiter.",
    )
    args = parser.parse_args()
    if args.aggregate_jobs is not None:
        jobs = " ".join(args.aggregate_jobs)
        formatted_jobs = jobs.split("*")
        if len(formatted_jobs) > 1:
            print("Warning; Split jobs by cluster with the * delimiter")
    else:
        formatted_jobs = []
    print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}")
    clusters = read_cluster_spec(args.config_file)
    submit_jobs_to_clusters(formatted_jobs, clusters)

The following script can be used to extract KubeRay Cluster information for aggregate job submission.

source/standalone/workflows/ray/grok_cluster_with_kubectl.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import os
import re
import subprocess
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

"""
This script requires that kubectl is installed and KubeRay was used to create the cluster.

Creates a config file containing ``name: <NAME> address: http://<IP>:<PORT>`` on
a new line for each cluster, and also fetches the MLFlow URI.

Usage:

.. code-block:: bash

    ./isaaclab.sh -p source/standalone/workflows/ray/grok_cluster_with_kubectl.py
    # For options, supply -h arg
"""


def get_namespace() -> str:
    """Get the current Kubernetes namespace from the context, fallback to default if not set"""
    try:
        namespace = (
            subprocess.check_output(["kubectl", "config", "view", "--minify", "--output", "jsonpath={..namespace}"])
            .decode()
            .strip()
        )
        if not namespace:
            namespace = "default"
    except subprocess.CalledProcessError:
        namespace = "default"
    return namespace


def get_pods(namespace: str = "default") -> list[tuple]:
    """Get a list of all of the pods in the namespace"""
    cmd = ["kubectl", "get", "pods", "-n", namespace, "--no-headers"]
    output = subprocess.check_output(cmd).decode()
    pods = []
    for line in output.strip().split("\n"):
        fields = line.split()
        pod_name = fields[0]
        status = fields[2]
        pods.append((pod_name, status))
    return pods


def get_clusters(pods: list, cluster_name_prefix: str) -> set:
    """
    Get unique cluster name(s). Works for one or more clusters, based off of the number of head nodes.
    Excludes MLflow deployments.
    """
    clusters = set()
    for pod_name, _ in pods:
        # Skip MLflow pods
        if "-mlflow" in pod_name:
            continue

        match = re.match(r"(" + re.escape(cluster_name_prefix) + r"[-\w]+)", pod_name)
        if match:
            # Get base name without head/worker suffix
            base_name = match.group(1).split("-head")[0].split("-worker")[0]
            clusters.add(base_name)
    return sorted(clusters)


def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") -> str:
    """
    Get MLflow service information if it exists in the namespace with the given prefix.
    Only works for a single cluster instance.
    Args:
        namespace: Kubernetes namespace
        cluster_prefix: Base cluster name (without -head/-worker suffixes)
    Returns:
        MLflow service URL
    """
    # Strip any -head or -worker suffixes to get base name
    if namespace is None:
        namespace = get_namespace()
    pods = get_pods(namespace=namespace)
    clusters = get_clusters(pods=pods, cluster_name_prefix=cluster_prefix)
    if len(clusters) > 1:
        raise ValueError("More than one cluster matches prefix, could not automatically determine mlflow info.")

    base_name = cluster_prefix.split("-head")[0].split("-worker")[0]
    mlflow_name = f"{base_name}-mlflow"

    cmd = ["kubectl", "get", "svc", mlflow_name, "-n", namespace, "--no-headers"]
    try:
        output = subprocess.check_output(cmd).decode()
        fields = output.strip().split()

        # Get cluster IP
        cluster_ip = fields[2]
        port = "5000"  # Default MLflow port

        return f"http://{cluster_ip}:{port}"
    except subprocess.CalledProcessError as e:
        raise ValueError(f"Could not grok MLflow: {e}")  # Fixed f-string


def check_clusters_running(pods: list, clusters: set) -> bool:
    """
    Check that all of the pods in all provided clusters are running.

    Args:
        pods (list): A list of tuples where each tuple contains the pod name and its status.
        clusters (set): A set of cluster names to check.

    Returns:
        bool: True if all pods in any of the clusters are running, False otherwise.
    """
    clusters_running = False
    for cluster in clusters:
        cluster_pods = [p for p in pods if p[0].startswith(cluster)]
        total_pods = len(cluster_pods)
        running_pods = len([p for p in cluster_pods if p[1] == "Running"])
        if running_pods == total_pods and running_pods > 0:
            clusters_running = True
            break
    return clusters_running


def get_ray_address(head_pod: str, namespace: str = "default", ray_head_name: str = "head") -> str:
    """
    Given a cluster head pod, check its logs, which should include the ray address which can accept job requests.

    Args:
        head_pod (str): The name of the head pod.
        namespace (str, optional): The Kubernetes namespace. Defaults to "default".
        ray_head_name (str, optional): The name of the ray head container. Defaults to "head".

    Returns:
        str: The ray address if found, None otherwise.

    Raises:
        ValueError: If the logs cannot be retrieved or the ray address is not found.
    """
    cmd = ["kubectl", "logs", head_pod, "-c", ray_head_name, "-n", namespace]
    try:
        output = subprocess.check_output(cmd).decode()
    except subprocess.CalledProcessError as e:
        raise ValueError(
            f"Could not enter head container with cmd {cmd}: {e}Perhaps try a different namespace or ray head name."
        )
    match = re.search(r"RAY_ADDRESS='([^']+)'", output)
    if match:
        return match.group(1)
    else:
        return None


def process_cluster(cluster_info: dict, ray_head_name: str = "head") -> str:
    """
    For each cluster, check that it is running, and get the Ray head address that will accept jobs.

    Args:
        cluster_info (dict): A dictionary containing cluster information with keys 'cluster', 'pods', and 'namespace'.
        ray_head_name (str, optional): The name of the ray head container. Defaults to "head".

    Returns:
        str: A string containing the cluster name and its Ray head address, or an error message if the head pod or Ray address is not found.
    """
    cluster, pods, namespace = cluster_info
    head_pod = None
    for pod_name, status in pods:
        if pod_name.startswith(cluster + "-head"):
            head_pod = pod_name
            break
    if not head_pod:
        return f"Error: Could not find head pod for cluster {cluster}\n"

    # Get RAY_ADDRESS and status
    ray_address = get_ray_address(head_pod, namespace=namespace, ray_head_name=ray_head_name)
    if not ray_address:
        return f"Error: Could not find RAY_ADDRESS for cluster {cluster}\n"

    # Return only cluster and ray address
    return f"name: {cluster} address: {ray_address}\n"


def main():
    # Parse command-line arguments
    parser = argparse.ArgumentParser(description="Process Ray clusters and save their specifications.")
    parser.add_argument("--prefix", default="isaacray", help="The prefix for the cluster names.")
    parser.add_argument("--output", default="~/.cluster_config", help="The file to save cluster specifications.")
    parser.add_argument("--ray_head_name", default="head", help="The metadata name for the ray head container")
    parser.add_argument(
        "--namespace", help="Kubernetes namespace to use. If not provided, will detect from current context."
    )
    args = parser.parse_args()

    # Get namespace from args or detect it
    current_namespace = args.namespace if args.namespace else get_namespace()
    print(f"Using namespace: {current_namespace}")

    cluster_name_prefix = args.prefix
    cluster_spec_file = os.path.expanduser(args.output)

    # Get all pods
    pods = get_pods(namespace=current_namespace)

    # Get clusters
    clusters = get_clusters(pods, cluster_name_prefix)
    if not clusters:
        print(f"No clusters found with prefix {cluster_name_prefix}")
        return

    # Wait for clusters to be running
    while True:
        pods = get_pods(namespace=current_namespace)
        if check_clusters_running(pods, clusters):
            break
        print("Waiting for all clusters to spin up...")
        time.sleep(5)

    print("Checking for MLflow:")
    # Check MLflow status for each cluster
    for cluster in clusters:
        try:
            mlflow_address = get_mlflow_info(current_namespace, cluster)
            print(f"MLflow address for {cluster}: {mlflow_address}")
        except ValueError as e:
            print(f"ML Flow not located: {e}")
    print()

    # Prepare cluster info for parallel processing
    cluster_infos = []
    for cluster in clusters:
        cluster_pods = [p for p in pods if p[0].startswith(cluster)]
        cluster_infos.append((cluster, cluster_pods, current_namespace))

    # Use ThreadPoolExecutor to process clusters in parallel
    results = []
    results_lock = threading.Lock()

    with ThreadPoolExecutor() as executor:
        future_to_cluster = {
            executor.submit(process_cluster, info, args.ray_head_name): info[0] for info in cluster_infos
        }
        for future in as_completed(future_to_cluster):
            cluster_name = future_to_cluster[future]
            try:
                result = future.result()
                with results_lock:
                    results.append(result)
            except Exception as exc:
                print(f"{cluster_name} generated an exception: {exc}")

    # Sort results alphabetically by cluster name
    results.sort()

    # Write sorted results to the output file (Ray info only)
    with open(cluster_spec_file, "w") as f:
        for result in results:
            f.write(result)

    print(f"Cluster spec information saved to {cluster_spec_file}")
    # Display the contents of the config file
    with open(cluster_spec_file) as f:
        print(f.read())


if __name__ == "__main__":
    main()

The following script can be used to easily create clusters on Google GKE.

source/standalone/workflows/ray/launch.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import pathlib
import subprocess
import yaml

from jinja2 import Environment, FileSystemLoader
from kubernetes import config

import source.standalone.workflows.ray.util as util

"""This script helps create one or more KubeRay clusters.

Usage:

.. code-block:: bash
    # If the head node is stuck on container creating, make sure to create a secret
    ./isaaclab.sh -p source/standalone/workflows/ray/launch.py -h

    # Examples

    # The following creates 8 GPUx1 nvidia l4 workers
    ./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
    --namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
    --num_workers 8 --num_clusters 1 --worker_accelerator nvidia-l4 --gpu_per_worker 1

    # The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers,
    # and 2 GPUx4 nvidia-tesla-t4 GPU workers
    ./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
    --namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
    --num_workers 1 2 --num_clusters 1 \
    --worker_accelerator nvidia-l4 nvidia-tesla-t4 --gpu_per_worker 1 2 4
"""
RAY_DIR = pathlib.Path(__file__).parent


def apply_manifest(args: argparse.Namespace) -> None:
    """Provided a Jinja templated ray.io/v1alpha1 file,
    populate the arguments and create the cluster. Additionally, create
    kubernetes containers for resources separated by '---' from the rest
    of the file.

    Args:
        args: Possible arguments concerning cluster parameters.
    """
    # Load Kubernetes configuration
    config.load_kube_config()

    # Set up Jinja2 environment for loading templates
    templates_dir = RAY_DIR / "cluster_configs" / args.cluster_host
    file_loader = FileSystemLoader(str(templates_dir))
    jinja_env = Environment(loader=file_loader, keep_trailing_newline=True)

    # Define template filename
    template_file = "kuberay.yaml.jinja"

    # Convert args namespace to a dictionary
    template_params = vars(args)

    # Load and render the template
    template = jinja_env.get_template(template_file)
    file_contents = template.render(template_params)

    # Parse all YAML documents in the rendered template
    all_yamls = []
    for doc in yaml.safe_load_all(file_contents):
        all_yamls.append(doc)

    # Convert back to YAML string, preserving multiple documents
    cleaned_yaml_string = ""
    for i, doc in enumerate(all_yamls):
        if i > 0:
            cleaned_yaml_string += "\n---\n"
        cleaned_yaml_string += yaml.dump(doc)

    # Apply the Kubernetes manifest using kubectl
    try:
        subprocess.run(["kubectl", "apply", "-f", "-"], input=cleaned_yaml_string, text=True, check=True)
    except subprocess.CalledProcessError as e:
        exit(f"An error occurred while running `kubectl`: {e}")


def parse_args() -> argparse.Namespace:
    """
    Parse command-line arguments for Kubernetes deployment script.

    Returns:
        argparse.Namespace: Parsed command-line arguments.
    """
    arg_parser = argparse.ArgumentParser(
        description="Script to apply manifests to create Kubernetes objects for Ray clusters.",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )

    arg_parser.add_argument(
        "--cluster_host",
        type=str,
        default="google_cloud",
        choices=["google_cloud"],
        help=(
            "In the cluster_configs directory, the name of the folder where a tune.yaml.jinja"
            "file exists defining the KubeRay config. Currently only google_cloud is supported."
        ),
    )

    arg_parser.add_argument(
        "--name",
        type=str,
        required=False,
        default="isaacray",
        help="Name of the Kubernetes deployment.",
    )

    arg_parser.add_argument(
        "--namespace",
        type=str,
        required=False,
        default="default",
        help="Kubernetes namespace to deploy the Ray cluster.",
    )

    arg_parser.add_argument(
        "--service_acount_name", type=str, required=False, default="default", help="The service account name to use."
    )

    arg_parser.add_argument(
        "--image",
        type=str,
        required=True,
        help="Docker image for the Ray cluster pods.",
    )

    arg_parser.add_argument(
        "--worker_accelerator",
        nargs="+",
        type=str,
        default=["nvidia-l4"],
        help="GPU accelerator name. Supply more than one for heterogeneous resources.",
    )

    arg_parser = util.add_resource_arguments(arg_parser, cluster_create_defaults=True)

    arg_parser.add_argument(
        "--num_clusters",
        type=int,
        default=1,
        help="How many Ray Clusters to create.",
    )
    arg_parser.add_argument(
        "--num_head_cpu",
        type=float,  # to be able to schedule partial CPU heads
        default=8,
        help="The number of CPUs to give the Ray head.",
    )

    arg_parser.add_argument("--head_ram_gb", type=int, default=8, help="How many gigs of ram to give the Ray head")
    args = arg_parser.parse_args()
    return util.fill_in_missing_resources(args, cluster_creation_flag=True)


def main():
    args = parse_args()

    if "head" in args.name:
        raise ValueError("For compatibility with other scripts, do not include head in the name")
    if args.num_clusters == 1:
        apply_manifest(args)
    else:
        default_name = args.name
        for i in range(args.num_clusters):
            args.name = default_name + "-" + str(i)
            apply_manifest(args)


if __name__ == "__main__":
    main()

Installation#

The Ray functionality requires additional dependencies be installed.

To use Ray without Kubernetes, like on a local computer or VM, kubectl is not required. For use on Kubernetes clusters with KubeRay, such as Google Kubernetes Engine or Amazon Elastic Kubernetes Service, kubectl is required, and can be installed via the Kubernetes website

The pythonic dependencies can be installed with:

# For multi-run support and resource isolation
./isaaclab.sh -p -m pip install ray[default]==2.31.0
# For hyperparameter tuning
./isaaclab.sh -p -m pip install ray[tune]==2.31.0
./isaaclab.sh -p -m pip install optuna bayesian-optimization
# MLFlow is needed only for fetching logs on clusters, not needed for local
./isaaclab.sh -p -m pip install mlflow

If using KubeRay clusters on Google GKE with the batteries-included cluster launch file, the following dependencies are also needed.

./isaaclab.sh -p -m pip install kubernetes Jinja2

Setup Overview: Cluster Configuration#

Select one of the following methods to create a Ray Cluster to accept and execute dispatched jobs.

KubeRay Clusters#

Attention

The ray command should be modified to use Isaac python, which could be achieved in a fashion similar to sed -i "1i $(echo "#!/workspace/isaaclab/_isaac_sim/python.sh")" \ /isaac-sim/kit/python/bin/ray && ln -s /isaac-sim/kit/python/bin/ray /usr/local/bin/ray.

Google Cloud is currently the only platform tested, although any cloud provider should work if one configures the following:

  • An container registry (NGC, GCS artifact registry, AWS ECR, etc) with an Isaac Lab image configured to support Ray. See cluster_configs/Dockerfile to see how to modify the isaac-lab-base container for Ray compatibility. Ray should use the isaac sim python shebang, and nvidia-smi should work within the container. Be careful with the setup here as paths need to be configured correctly for everything to work. It’s likely that the example dockerfile will work out of the box and can be pushed to the registry, as long as the base image has already been built as in the container guide

  • A Kubernetes setup with available NVIDIA RTX (likely l4 or l40 or tesla-t4 or a10) GPU-passthrough node-pool resources, that has access to your container registry/storage bucket and has the Ray operator enabled with correct IAM permissions. This can be easily achieved with services such as Google GKE or AWS EKS, provided that your account or organization has been granted a GPU-budget. It is recommended to use manual kubernetes services as opposed to “autopilot” services for cost-effective experimentation as this way clusters can be completely shut down when not in use, although this may require installing the Nvidia GPU Operator

  • An MLFlow server that your cluster has access to.

  • A kuberay.yaml.ninja file that describes how to allocate resources (already included for Google Cloud, which can be referenced for the format and MLFlow integration)

Ray Clusters (Without Kubernetes)#

Attention

Modify the Ray command to use Isaac Python like in KubeRay Clusters, and follow the same steps for creating an image/cluster permissions/bucket access.

See the Ray Clusters Overview or Anyscale for more information

Dispatching Jobs and Tuning#

Select one of the following guides that matches your desired Cluster configuration.

Simple Ray Cluster (Local/VM)#

This guide assumes that there is a Ray cluster already running, and that this script is run locally on the cluster, or that the cluster job submission address is known.

1.) Testing that the cluster works can be done as follows.

./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py --test

2.) Submitting resource-wrapped sub-jobs can be done as described in the following file:

source/standalone/workflows/ray/wrap_resources.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse

import ray
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

import source.standalone.workflows.ray.util as util

"""
This script dispatches sub-job(s) (either individual jobs or tuning aggregate jobs)
to worker(s) on GPU-enabled node(s) of a specific cluster as part of an resource-wrapped aggregate
job. If no desired compute resources for each sub-job are specified,
this script creates one worker per available node for each node with GPU(s) in the cluster.
If the desired resources for each sub-job is specified,
the maximum number of workers possible with the desired resources are created for each node
with GPU(s) in the cluster. It is also possible to split available node resources for each node
into the desired number of workers with the ``--num_workers`` flag, to be able to easily
parallelize sub-jobs on multi-GPU nodes. Due to Isaac Lab requiring a GPU,
this ignores all CPU only nodes such as loggers.

Sub-jobs are matched with node(s) in a cluster via the following relation:
sorted_nodes = Node sorted by descending GPUs, then descending CPUs, then descending RAM, then node ID
node_submitted_to = sorted_nodes[job_index % total_node_count]

To check the ordering of sorted nodes, supply the ``--test`` argument and run the script.

Sub-jobs are separated by the + delimiter. The ``--sub_jobs`` argument must be the last
argument supplied to the script.

If there is more than one available worker, and more than one sub-job,
sub-jobs will be executed in parallel. If there are more sub-jobs than workers, sub-jobs will
be dispatched to workers as they become available. There is no limit on the number
of sub-jobs that can be near-simultaneously submitted.

This script is meant to be executed on a Ray cluster head node as an aggregate cluster job.
To submit aggregate cluster jobs such as this script to one or more remote clusters,
see :file:`../submit_isaac_ray_job.py`.

KubeRay clusters on Google GKE can be created with :file:`../launch.py`

Usage:

.. code-block:: bash
    # **Ensure that sub-jobs are separated by the ``+`` delimiter.**
    # Generic Templates-----------------------------------
    ./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py -h
    # No resource isolation; no parallelization:
    ./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py
    --sub_jobs <JOB0>+<JOB1>+<JOB2>
    # Automatic Resource Isolation; Example A: needed for parallelization
    ./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py \
    --num_workers <NUM_TO_DIVIDE_TOTAL_RESOURCES_BY> \
    --sub_jobs <JOB0>+<JOB1>
    # Manual Resource Isolation; Example B:  needed for parallelization
    ./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py --num_cpu_per_worker <CPU> \
    --gpu_per_worker <GPU> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1>
    # Manual Resource Isolation; Example C: Needed for parallelization, for heterogeneous workloads
    ./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py --num_cpu_per_worker <CPU> \
    --gpu_per_worker <GPU1> <GPU2> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1>
    # to see all arguments
    ./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py -h
"""


def wrap_resources_to_jobs(jobs: list[str], args: argparse.Namespace) -> None:
    """
    Provided a list of jobs, dispatch jobs to one worker per available node,
    unless otherwise specified by resource constraints.

    Args:
        jobs: bash commands to execute on a Ray cluster
        args: The arguments for resource allocation

    """
    if not ray.is_initialized():
        ray.init(address=args.ray_address, log_to_driver=True)
    job_results = []
    gpu_node_resources = util.get_gpu_node_resources(include_id=True, include_gb_ram=True)

    if any([args.gpu_per_worker, args.cpu_per_worker, args.ram_gb_per_worker]) and args.num_workers:
        raise ValueError("Either specify only num_workers or only granular resources(GPU,CPU,RAM_GB).")

    num_nodes = len(gpu_node_resources)
    # Populate arguments
    formatted_node_resources = {
        "gpu_per_worker": [gpu_node_resources[i]["GPU"] for i in range(num_nodes)],
        "cpu_per_worker": [gpu_node_resources[i]["CPU"] for i in range(num_nodes)],
        "ram_gb_per_worker": [gpu_node_resources[i]["ram_gb"] for i in range(num_nodes)],
        "num_workers": args.num_workers,  # By default, 1 worker por node
    }
    args = util.fill_in_missing_resources(args, resources=formatted_node_resources, policy=min)
    print(f"[INFO]: Number of GPU nodes found: {num_nodes}")
    if args.test:
        jobs = ["nvidia-smi"] * num_nodes
    for i, job in enumerate(jobs):
        gpu_node = gpu_node_resources[i % num_nodes]
        print(f"[INFO]: Submitting job {i + 1} of {len(jobs)} with job '{job}' to node {gpu_node}")
        print(
            f"[INFO]: Resource parameters: GPU: {args.gpu_per_worker[i]}"
            f" CPU: {args.cpu_per_worker[i]} RAM {args.ram_gb_per_worker[i]}"
        )
        print(f"[INFO] For the node parameters, creating {args.num_workers[i]} workers")
        num_gpus = args.gpu_per_worker[i] / args.num_workers[i]
        num_cpus = args.cpu_per_worker[i] / args.num_workers[i]
        memory = (args.ram_gb_per_worker[i] * 1024**3) / args.num_workers[i]
        print(f"[INFO]: Requesting {num_gpus=} {num_cpus=} {memory=} id={gpu_node['id']}")
        job = util.remote_execute_job.options(
            num_gpus=num_gpus,
            num_cpus=num_cpus,
            memory=memory,
            scheduling_strategy=NodeAffinitySchedulingStrategy(gpu_node["id"], soft=False),
        ).remote(job, f"Job {i}", args.test)
        job_results.append(job)

    results = ray.get(job_results)
    for i, result in enumerate(results):
        print(f"[INFO]: Job {i} result: {result}")
    print("[INFO]: All jobs completed.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Submit multiple jobs with optional GPU testing.")
    parser = util.add_resource_arguments(arg_parser=parser)
    parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.")
    parser.add_argument(
        "--test",
        action="store_true",
        help=(
            "Run nvidia-smi test instead of the arbitrary job,"
            "can use as a sanity check prior to any jobs to check "
            "that GPU resources are correctly isolated."
        ),
    )
    parser.add_argument(
        "--sub_jobs",
        type=str,
        nargs=argparse.REMAINDER,
        help="This should be last wrapper argument. Jobs separated by the + delimiter to run on a cluster.",
    )
    args = parser.parse_args()
    if args.sub_jobs is not None:
        jobs = " ".join(args.sub_jobs)
        formatted_jobs = jobs.split("+")
    else:
        formatted_jobs = []
    print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}")
    wrap_resources_to_jobs(jobs=formatted_jobs, args=args)

3.) For tuning jobs, specify the hyperparameter sweep similar to the following two files.

source/standalone/workflows/ray/hyperparameter_tuning/vision_cfg.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import pathlib
import sys

# Allow for import of items from the ray workflow.
UTIL_DIR = pathlib.Path(__file__).parent.parent.parent
sys.path.append(str(UTIL_DIR))
import tuner
import util
from ray import tune


class CameraJobCfg(tuner.JobCfg):
    """In order to be compatible with :meth: invoke_tuning_run, and
    :class:IsaacLabTuneTrainable , configurations should
    be in a similar format to this class. This class can vary env count/horizon length,
    CNN structure, and MLP structure. Broad possible ranges are set, the specific values
    that work can be found via tuning. Tuning results can inform better ranges for a second tuning run.
    These ranges were selected for demonstration purposes. Best ranges are run/task specific."""

    @staticmethod
    def _get_batch_size_divisors(batch_size: int, min_size: int = 128) -> list[int]:
        """Get valid batch divisors to combine with num_envs and horizon length"""
        divisors = [i for i in range(min_size, batch_size + 1) if batch_size % i == 0]
        return divisors if divisors else [min_size]

    def __init__(self, cfg={}, vary_env_count: bool = False, vary_cnn: bool = False, vary_mlp: bool = False):
        cfg = util.populate_isaac_ray_cfg_args(cfg)

        # Basic configuration
        cfg["runner_args"]["headless_singleton"] = "--headless"
        cfg["runner_args"]["enable_cameras_singleton"] = "--enable_cameras"
        cfg["hydra_args"]["agent.params.config.max_epochs"] = 200

        if vary_env_count:  # Vary the env count, and horizon length, and select a compatible mini-batch size
            # Check from 512 to 8196 envs in powers of 2
            # check horizon lengths of 8 to 256
            # More envs should be better, but different batch sizes can improve gradient estimation
            env_counts = [2**x for x in range(9, 13)]
            horizon_lengths = [2**x for x in range(3, 8)]

            selected_env_count = tune.choice(env_counts)
            selected_horizon = tune.choice(horizon_lengths)

            cfg["runner_args"]["--num_envs"] = selected_env_count
            cfg["hydra_args"]["agent.params.config.horizon_length"] = selected_horizon

            def get_valid_batch_size(config):
                num_envs = config["runner_args"]["--num_envs"]
                horizon_length = config["hydra_args"]["agent.params.config.horizon_length"]
                total_batch = horizon_length * num_envs
                divisors = self._get_batch_size_divisors(total_batch)
                return divisors[0]

            cfg["hydra_args"]["agent.params.config.minibatch_size"] = tune.sample_from(get_valid_batch_size)

        if vary_cnn:  # Vary the depth, and size of the layers in the CNN part of the agent
            # Also varies kernel size, and stride.
            num_layers = tune.randint(2, 3)
            cfg["hydra_args"]["agent.params.network.cnn.type"] = "conv2d"
            cfg["hydra_args"]["agent.params.network.cnn.activation"] = tune.choice(["relu", "elu"])
            cfg["hydra_args"]["agent.params.network.cnn.initializer"] = "{name:default}"
            cfg["hydra_args"]["agent.params.network.cnn.regularizer"] = "{name:None}"

            def get_cnn_layers(_):
                layers = []
                size = 64  # Initial input size

                for _ in range(num_layers.sample()):
                    # Get valid kernel sizes for current size
                    valid_kernels = [k for k in [3, 4, 6, 8, 10, 12] if k <= size]
                    if not valid_kernels:
                        break

                    kernel = int(tune.choice([str(k) for k in valid_kernels]).sample())
                    stride = int(tune.choice(["1", "2", "3", "4"]).sample())
                    padding = int(tune.choice(["0", "1"]).sample())

                    # Calculate next size
                    next_size = ((size + 2 * padding - kernel) // stride) + 1
                    if next_size <= 0:
                        break

                    layers.append({
                        "filters": tune.randint(16, 32).sample(),
                        "kernel_size": str(kernel),
                        "strides": str(stride),
                        "padding": str(padding),
                    })
                    size = next_size

                return layers

            cfg["hydra_args"]["agent.params.network.cnn.convs"] = tune.sample_from(get_cnn_layers)

        if vary_mlp:  # Vary the MLP structure; neurons (units) per layer, number of layers,

            max_num_layers = 6
            max_neurons_per_layer = 128
            if "env.observations.policy.image.params.model_name" in cfg["hydra_args"]:
                # By decreasing MLP size when using pretrained helps prevent out of memory on L4
                max_num_layers = 3
                max_neurons_per_layer = 32
            if "agent.params.network.cnn.convs" in cfg["hydra_args"]:
                # decrease MLP size to prevent running out of memory on L4
                max_num_layers = 2
                max_neurons_per_layer = 32

            num_layers = tune.randint(1, max_num_layers)

            def get_mlp_layers(_):
                return [tune.randint(4, max_neurons_per_layer).sample() for _ in range(num_layers.sample())]

            cfg["hydra_args"]["agent.params.network.mlp.units"] = tune.sample_from(get_mlp_layers)
            cfg["hydra_args"]["agent.params.network.mlp.initializer.name"] = tune.choice(["default"]).sample()
            cfg["hydra_args"]["agent.params.network.mlp.activation"] = tune.choice(
                ["relu", "tanh", "sigmoid", "elu"]
            ).sample()

        super().__init__(cfg)


class ResNetCameraJob(CameraJobCfg):
    """Try different ResNet sizes."""

    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["hydra_args"]["env.observations.policy.image.params.model_name"] = tune.choice(
            ["resnet18", "resnet34", "resnet50", "resnet101"]
        )
        super().__init__(cfg, vary_env_count=True, vary_cnn=False, vary_mlp=True)


class TheiaCameraJob(CameraJobCfg):
    """Try different Theia sizes."""

    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["hydra_args"]["env.observations.policy.image.params.model_name"] = tune.choice([
            "theia-tiny-patch16-224-cddsv",
            "theia-tiny-patch16-224-cdiv",
            "theia-small-patch16-224-cdiv",
            "theia-base-patch16-224-cdiv",
            "theia-small-patch16-224-cddsv",
            "theia-base-patch16-224-cddsv",
        ])
        super().__init__(cfg, vary_env_count=True, vary_cnn=False, vary_mlp=True)
source/standalone/workflows/ray/hyperparameter_tuning/vision_cartpole_cfg.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import pathlib
import sys

# Allow for import of items from the ray workflow.
CUR_DIR = pathlib.Path(__file__).parent
UTIL_DIR = CUR_DIR.parent
sys.path.extend([str(UTIL_DIR), str(CUR_DIR)])
import util
import vision_cfg
from ray import tune


class CartpoleRGBNoTuneJobCfg(vision_cfg.CameraJobCfg):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
        super().__init__(cfg, vary_env_count=False, vary_cnn=False, vary_mlp=False)


class CartpoleRGBCNNOnlyJobCfg(vision_cfg.CameraJobCfg):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
        super().__init__(cfg, vary_env_count=False, vary_cnn=True, vary_mlp=False)


class CartpoleRGBJobCfg(vision_cfg.CameraJobCfg):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
        super().__init__(cfg, vary_env_count=True, vary_cnn=True, vary_mlp=True)


class CartpoleResNetJobCfg(vision_cfg.ResNetCameraJob):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-ResNet18-v0"])
        super().__init__(cfg)


class CartpoleTheiaJobCfg(vision_cfg.TheiaCameraJob):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"])
        super().__init__(cfg)

Then, see the local examples in the following file to see how to start a tuning run.

source/standalone/workflows/ray/tuner.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
import importlib.util
import os
import sys
from time import sleep

import ray
import util
from ray import air, tune
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.repeater import Repeater

"""
This script breaks down an aggregate tuning job, as defined by a hyperparameter sweep configuration,
into individual jobs (shell commands) to run on the GPU-enabled nodes of the cluster.
By default, (unless combined as a sub-job in a resource-wrapped aggregate job), one worker is created
for each GPU-enabled node in the cluster for each individual job.

Each hyperparameter sweep configuration should include the workflow,
runner arguments, and hydra arguments to vary.

This assumes that all workers in a cluster are homogeneous. For heterogeneous workloads,
create several heterogeneous clusters (with homogeneous nodes in each cluster),
then submit several overall-cluster jobs with :file:`../submit_job.py`.
KubeRay clusters on Google GKE can be created with :file:`../launch.py`

To report tune metrics on clusters, a running MLFlow server with a known URI that the cluster has
access to is required. For KubeRay clusters configured with :file:`../launch.py`, this is included
automatically, and can be easily found with with :file:`grok_cluster_with_kubectl.py`

Usage:

.. code-block:: bash

    ./isaaclab.sh -p source/standalone/workflows/ray/tuner.py -h

    # Examples
    # Local (not within a docker container, when within a local docker container, do not supply run_mode argument)
    ./isaaclab.sh -p source/standalone/workflows/ray/tuner.py --run_mode local \
    --cfg_file source/standalone/workflows/ray/hyperparameter_tuning/vision_cartpole_cfg.py \
    --cfg_class CartpoleRGBNoTuneJobCfg
    # Local docker: start the ray server and run above command in the same running container without run_mode arg
    # Remote (run grok cluster or create config file mentioned in :file:`submit_job.py`)
    ./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py \
    --aggregate_jobs tuner.py \
    --cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
    --cfg_class CartpoleRGBNoTuneJobCfg --mlflow_uri <MLFLOW_URI_FROM_GROK_OR_MANUAL>

"""

DOCKER_PREFIX = "/workspace/isaaclab/"
BASE_DIR = os.path.expanduser("~")
PYTHON_EXEC = "./isaaclab.sh -p"
WORKFLOW = "source/standalone/workflows/rl_games/train.py"
NUM_WORKERS_PER_NODE = 1  # needed for local parallelism


class IsaacLabTuneTrainable(tune.Trainable):
    """The Isaac Lab Ray Tune Trainable.
    This class uses the standalone workflows to start jobs, along with the hydra integration.
    This class achieves Ray-based logging through reading the tensorboard logs from
    the standalone workflows. This depends on a config generated in the format of
    :class:`JobCfg`
    """

    def setup(self, config: dict) -> None:
        """Get the invocation command, return quick for easy scheduling."""
        self.data = None
        self.invoke_cmd = util.get_invocation_command_from_cfg(cfg=config, python_cmd=PYTHON_EXEC, workflow=WORKFLOW)
        print(f"[INFO]: Recovered invocation with {self.invoke_cmd}")
        self.experiment = None

    def reset_config(self, new_config):
        """Allow environments to be re-used by fetching a new invocation command"""
        self.setup(new_config)
        return True

    def step(self) -> dict:
        if self.experiment is None:  # start experiment
            # When including this as first step instead of setup, experiments get scheduled faster
            # Don't want to block the scheduler while the experiment spins up
            print(f"[INFO]: Invoking experiment as first step with {self.invoke_cmd}...")
            experiment = util.execute_job(
                self.invoke_cmd,
                identifier_string="",
                extract_experiment=True,
                persistent_dir=BASE_DIR,
            )
            self.experiment = experiment
            print(f"[INFO]: Tuner recovered experiment info {experiment}")
            self.proc = experiment["proc"]
            self.experiment_name = experiment["experiment_name"]
            self.isaac_logdir = experiment["logdir"]
            self.tensorboard_logdir = self.isaac_logdir + f"/{self.experiment_name}/summaries"
            self.done = False

        if self.proc is None:
            raise ValueError("Could not start trial.")

        if self.proc.poll() is not None:  # process finished, signal finish
            self.data["done"] = True
            print("[INFO]: Process finished, returning...")
        else:  # wait until the logs are ready or fresh
            data = util.load_tensorboard_logs(self.tensorboard_logdir)

            while data is None:
                data = util.load_tensorboard_logs(self.tensorboard_logdir)
                sleep(2)  # Lazy report metrics to avoid performance overhead

            if self.data is not None:
                while util._dicts_equal(data, self.data):
                    data = util.load_tensorboard_logs(self.tensorboard_logdir)
                    sleep(2)  # Lazy report metrics to avoid performance overhead

            self.data = data
            self.data["done"] = False
        return self.data

    def default_resource_request(self):
        """How many resources each trainable uses. Assumes homogeneous resources across gpu nodes,
        and that each trainable is meant for one node, where it uses all available resources."""
        resources = util.get_gpu_node_resources(one_node_only=True)
        if NUM_WORKERS_PER_NODE != 1:
            print("[WARNING]: Splitting node into more than one worker")
        return tune.PlacementGroupFactory(
            [{"CPU": resources["CPU"] / NUM_WORKERS_PER_NODE, "GPU": resources["GPU"] / NUM_WORKERS_PER_NODE}],
            strategy="STRICT_PACK",
        )


def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None:
    """Invoke an Isaac-Ray tuning run.

    Log either to a local directory or to MLFlow.
    Args:
        cfg: Configuration dictionary extracted from job setup
        args: Command-line arguments related to tuning.
    """
    # Allow for early exit
    os.environ["TUNE_DISABLE_STRICT_METRIC_CHECKING"] = "1"

    print("[WARNING]: Not saving checkpoints, just running experiment...")
    print("[INFO]: Model parameters and metrics will be preserved.")
    print("[WARNING]: For homogeneous cluster resources only...")
    # Get available resources
    resources = util.get_gpu_node_resources()
    print(f"[INFO]: Available resources {resources}")

    if not ray.is_initialized():
        ray.init(
            address=args.ray_address,
            log_to_driver=True,
            num_gpus=len(resources),
        )

    print(f"[INFO]: Using config {cfg}")

    # Configure the search algorithm and the repeater
    searcher = OptunaSearch(
        metric=args.metric,
        mode=args.mode,
    )
    repeat_search = Repeater(searcher, repeat=args.repeat_run_count)

    if args.run_mode == "local":  # Standard config, to file
        run_config = air.RunConfig(
            storage_path="/tmp/ray",
            name=f"IsaacRay-{args.cfg_class}-tune",
            verbose=1,
            checkpoint_config=air.CheckpointConfig(
                checkpoint_frequency=0,  # Disable periodic checkpointing
                checkpoint_at_end=False,  # Disable final checkpoint
            ),
        )

    elif args.run_mode == "remote":  # MLFlow, to MLFlow server
        mlflow_callback = MLflowLoggerCallback(
            tracking_uri=args.mlflow_uri,
            experiment_name=f"IsaacRay-{args.cfg_class}-tune",
            save_artifact=False,
            tags={"run_mode": "remote", "cfg_class": args.cfg_class},
        )

        run_config = ray.train.RunConfig(
            name="mlflow",
            storage_path="/tmp/ray",
            callbacks=[mlflow_callback],
            checkpoint_config=ray.train.CheckpointConfig(checkpoint_frequency=0, checkpoint_at_end=False),
        )
    else:
        raise ValueError("Unrecognized run mode.")

    # Configure the tuning job
    tuner = tune.Tuner(
        IsaacLabTuneTrainable,
        param_space=cfg,
        tune_config=tune.TuneConfig(
            search_alg=repeat_search,
            num_samples=args.num_samples,
            reuse_actors=True,
        ),
        run_config=run_config,
    )

    # Execute the tuning
    tuner.fit()

    # Save results to mounted volume
    if args.run_mode == "local":
        print("[DONE!]: Check results with tensorboard dashboard")
    else:
        print("[DONE!]: Check results with MLFlow dashboard")


class JobCfg:
    """To be compatible with :meth: invoke_tuning_run and :class:IsaacLabTuneTrainable,
    at a minimum, the tune job should inherit from this class."""

    def __init__(self, cfg):
        assert "runner_args" in cfg, "No runner arguments specified."
        assert "--task" in cfg["runner_args"], "No task specified."
        assert "hydra_args" in cfg, "No hypeparameters specified."
        self.cfg = cfg


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Tune Isaac Lab hyperparameters.")
    parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.")
    parser.add_argument(
        "--cfg_file",
        type=str,
        default="hyperparameter_tuning/vision_cartpole_cfg.py",
        required=False,
        help="The relative filepath where a hyperparameter sweep is defined",
    )
    parser.add_argument(
        "--cfg_class",
        type=str,
        default="CartpoleRGBNoTuneJobCfg",
        required=False,
        help="Name of the hyperparameter sweep class to use",
    )
    parser.add_argument(
        "--run_mode",
        choices=["local", "remote"],
        default="remote",
        help=(
            "Set to local to use ./isaaclab.sh -p python, set to "
            "remote to use /workspace/isaaclab/isaaclab.sh -p python"
        ),
    )
    parser.add_argument(
        "--workflow",
        default=None,  # populated with RL Games
        help="The absolute path of the workflow to use for the experiment. By default, RL Games is used.",
    )
    parser.add_argument(
        "--mlflow_uri",
        type=str,
        default=None,
        required=False,
        help="The MLFlow Uri.",
    )
    parser.add_argument(
        "--num_workers_per_node",
        type=int,
        default=1,
        help="Number of workers to run on each GPU node. Only supply for parallelism on multi-gpu nodes",
    )

    parser.add_argument("--metric", type=str, default="rewards/time", help="What metric to tune for.")

    parser.add_argument(
        "--mode",
        choices=["max", "min"],
        default="max",
        help="What to optimize the metric to while tuning",
    )
    parser.add_argument(
        "--num_samples",
        type=int,
        default=100,
        help="How many hyperparameter runs to try total.",
    )
    parser.add_argument(
        "--repeat_run_count",
        type=int,
        default=3,
        help="How many times to repeat each hyperparameter config.",
    )

    args = parser.parse_args()
    NUM_WORKERS_PER_NODE = args.num_workers_per_node
    print(f"[INFO]: Using {NUM_WORKERS_PER_NODE} workers per node.")
    if args.run_mode == "remote":
        BASE_DIR = DOCKER_PREFIX  # ensure logs are dumped to persistent location
        PYTHON_EXEC = DOCKER_PREFIX + PYTHON_EXEC[2:]
        if args.workflow is None:
            WORKFLOW = DOCKER_PREFIX + WORKFLOW
        else:
            WORKFLOW = args.workflow
        print(f"[INFO]: Using remote mode {PYTHON_EXEC=} {WORKFLOW=}")

        if args.mlflow_uri is not None:
            import mlflow

            mlflow.set_tracking_uri(args.mlflow_uri)
            from ray.air.integrations.mlflow import MLflowLoggerCallback
        else:
            raise ValueError("Please provide a result MLFLow URI server.")
    else:  # local
        PYTHON_EXEC = os.getcwd() + "/" + PYTHON_EXEC[2:]
        if args.workflow is None:
            WORKFLOW = os.getcwd() + "/" + WORKFLOW
        else:
            WORKFLOW = args.workflow
        BASE_DIR = os.getcwd()
        print(f"[INFO]: Using local mode {PYTHON_EXEC=} {WORKFLOW=}")
    file_path = args.cfg_file
    class_name = args.cfg_class
    print(f"[INFO]: Attempting to use sweep config from {file_path=} {class_name=}")
    module_name = os.path.splitext(os.path.basename(file_path))[0]

    spec = importlib.util.spec_from_file_location(module_name, file_path)
    module = importlib.util.module_from_spec(spec)
    sys.modules[module_name] = module
    spec.loader.exec_module(module)
    print(f"[INFO]: Successfully imported {module_name} from {file_path}")
    if hasattr(module, class_name):
        ClassToInstantiate = getattr(module, class_name)
        print(f"[INFO]: Found correct class {ClassToInstantiate}")
        instance = ClassToInstantiate()
        print(f"[INFO]: Successfully instantiated class '{class_name}' from {file_path}")
        cfg = instance.cfg
        print(f"[INFO]: Grabbed the following hyperparameter sweep config: \n {cfg}")
        invoke_tuning_run(cfg, args)

    else:
        raise AttributeError(f"[ERROR]:Class '{class_name}' not found in {file_path}")

To view the logs, simply run tensorboard --logdir=<LOCAL_STORAGE_PATH_READ_FROM_OUTPUT>

Remote Ray Cluster Setup and Use#

This guide assumes that one desires to create a cluster on a remote host or server. This guide includes shared steps, and KubeRay or Ray specific steps. Follow all shared steps (part I and II), and then only the KubeRay or Ray steps depending on your desired configuration, in order of shared steps part I, then the configuration specific steps, then shared steps part II.

Shared Steps Between KubeRay and Pure Ray Part I#

1.) Build the Isaac Ray image, and upload it to your container registry of choice.

# Login with NGC (nvcr.io) registry first, see docker steps in repo.
./isaaclab.sh -p docker/container.py start
# Build the special Isaac Lab Ray Image
docker build -t <REGISTRY/IMAGE_NAME> -f source/standalone/workflows/ray/cluster_configs/Dockerfile .
# Push the image to your registry of choice.
docker push <REGISTRY/IMAGE_NAME>

KubeRay Specific#

k9s is a great tool for monitoring your clusters that can easily be installed with snap install k9s --devmode.

1.) Verify cluster access, and that the correct operators are installed.

# Verify cluster access
kubectl cluster-info
# If using a manually managed cluster (not Autopilot or the like)
# verify that there are node pools
kubectl get nodes
# Check that the ray operator is installed on the cluster
# should list rayclusters.ray.io , rayjobs.ray.io , and rayservices.ray.io
kubectl get crds | grep ray
# Check that the NVIDIA Driver Operator is installed on the cluster
# should list clusterpolicies.nvidia.com
kubectl get crds | grep nvidia

2.) Create the KubeRay cluster and an MLFlow server for receiving logs that your cluster has access to. This can be done automatically for Google GKE, where instructions are included in the following creation file. More than once cluster can be created at once. Each cluster can have heterogeneous resources if so desired, although only For other cloud services, the kuberay.yaml.ninja will be similar to that of Google’s.

source/standalone/workflows/ray/launch.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import pathlib
import subprocess
import yaml

from jinja2 import Environment, FileSystemLoader
from kubernetes import config

import source.standalone.workflows.ray.util as util

"""This script helps create one or more KubeRay clusters.

Usage:

.. code-block:: bash
    # If the head node is stuck on container creating, make sure to create a secret
    ./isaaclab.sh -p source/standalone/workflows/ray/launch.py -h

    # Examples

    # The following creates 8 GPUx1 nvidia l4 workers
    ./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
    --namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
    --num_workers 8 --num_clusters 1 --worker_accelerator nvidia-l4 --gpu_per_worker 1

    # The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers,
    # and 2 GPUx4 nvidia-tesla-t4 GPU workers
    ./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
    --namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
    --num_workers 1 2 --num_clusters 1 \
    --worker_accelerator nvidia-l4 nvidia-tesla-t4 --gpu_per_worker 1 2 4
"""
RAY_DIR = pathlib.Path(__file__).parent


def apply_manifest(args: argparse.Namespace) -> None:
    """Provided a Jinja templated ray.io/v1alpha1 file,
    populate the arguments and create the cluster. Additionally, create
    kubernetes containers for resources separated by '---' from the rest
    of the file.

    Args:
        args: Possible arguments concerning cluster parameters.
    """
    # Load Kubernetes configuration
    config.load_kube_config()

    # Set up Jinja2 environment for loading templates
    templates_dir = RAY_DIR / "cluster_configs" / args.cluster_host
    file_loader = FileSystemLoader(str(templates_dir))
    jinja_env = Environment(loader=file_loader, keep_trailing_newline=True)

    # Define template filename
    template_file = "kuberay.yaml.jinja"

    # Convert args namespace to a dictionary
    template_params = vars(args)

    # Load and render the template
    template = jinja_env.get_template(template_file)
    file_contents = template.render(template_params)

    # Parse all YAML documents in the rendered template
    all_yamls = []
    for doc in yaml.safe_load_all(file_contents):
        all_yamls.append(doc)

    # Convert back to YAML string, preserving multiple documents
    cleaned_yaml_string = ""
    for i, doc in enumerate(all_yamls):
        if i > 0:
            cleaned_yaml_string += "\n---\n"
        cleaned_yaml_string += yaml.dump(doc)

    # Apply the Kubernetes manifest using kubectl
    try:
        subprocess.run(["kubectl", "apply", "-f", "-"], input=cleaned_yaml_string, text=True, check=True)
    except subprocess.CalledProcessError as e:
        exit(f"An error occurred while running `kubectl`: {e}")


def parse_args() -> argparse.Namespace:
    """
    Parse command-line arguments for Kubernetes deployment script.

    Returns:
        argparse.Namespace: Parsed command-line arguments.
    """
    arg_parser = argparse.ArgumentParser(
        description="Script to apply manifests to create Kubernetes objects for Ray clusters.",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )

    arg_parser.add_argument(
        "--cluster_host",
        type=str,
        default="google_cloud",
        choices=["google_cloud"],
        help=(
            "In the cluster_configs directory, the name of the folder where a tune.yaml.jinja"
            "file exists defining the KubeRay config. Currently only google_cloud is supported."
        ),
    )

    arg_parser.add_argument(
        "--name",
        type=str,
        required=False,
        default="isaacray",
        help="Name of the Kubernetes deployment.",
    )

    arg_parser.add_argument(
        "--namespace",
        type=str,
        required=False,
        default="default",
        help="Kubernetes namespace to deploy the Ray cluster.",
    )

    arg_parser.add_argument(
        "--service_acount_name", type=str, required=False, default="default", help="The service account name to use."
    )

    arg_parser.add_argument(
        "--image",
        type=str,
        required=True,
        help="Docker image for the Ray cluster pods.",
    )

    arg_parser.add_argument(
        "--worker_accelerator",
        nargs="+",
        type=str,
        default=["nvidia-l4"],
        help="GPU accelerator name. Supply more than one for heterogeneous resources.",
    )

    arg_parser = util.add_resource_arguments(arg_parser, cluster_create_defaults=True)

    arg_parser.add_argument(
        "--num_clusters",
        type=int,
        default=1,
        help="How many Ray Clusters to create.",
    )
    arg_parser.add_argument(
        "--num_head_cpu",
        type=float,  # to be able to schedule partial CPU heads
        default=8,
        help="The number of CPUs to give the Ray head.",
    )

    arg_parser.add_argument("--head_ram_gb", type=int, default=8, help="How many gigs of ram to give the Ray head")
    args = arg_parser.parse_args()
    return util.fill_in_missing_resources(args, cluster_creation_flag=True)


def main():
    args = parse_args()

    if "head" in args.name:
        raise ValueError("For compatibility with other scripts, do not include head in the name")
    if args.num_clusters == 1:
        apply_manifest(args)
    else:
        default_name = args.name
        for i in range(args.num_clusters):
            args.name = default_name + "-" + str(i)
            apply_manifest(args)


if __name__ == "__main__":
    main()

3.) Fetch the KubeRay cluster IP addresses, and the MLFLow Server IP. This can be done automatically for KubeRay clusters, where instructions are included in the following fetching file. The KubeRay clusters are saved to a file, but the MLFLow Server IP is printed.

source/standalone/workflows/ray/grok_cluster_with_kubectl.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import os
import re
import subprocess
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

"""
This script requires that kubectl is installed and KubeRay was used to create the cluster.

Creates a config file containing ``name: <NAME> address: http://<IP>:<PORT>`` on
a new line for each cluster, and also fetches the MLFlow URI.

Usage:

.. code-block:: bash

    ./isaaclab.sh -p source/standalone/workflows/ray/grok_cluster_with_kubectl.py
    # For options, supply -h arg
"""


def get_namespace() -> str:
    """Get the current Kubernetes namespace from the context, fallback to default if not set"""
    try:
        namespace = (
            subprocess.check_output(["kubectl", "config", "view", "--minify", "--output", "jsonpath={..namespace}"])
            .decode()
            .strip()
        )
        if not namespace:
            namespace = "default"
    except subprocess.CalledProcessError:
        namespace = "default"
    return namespace


def get_pods(namespace: str = "default") -> list[tuple]:
    """Get a list of all of the pods in the namespace"""
    cmd = ["kubectl", "get", "pods", "-n", namespace, "--no-headers"]
    output = subprocess.check_output(cmd).decode()
    pods = []
    for line in output.strip().split("\n"):
        fields = line.split()
        pod_name = fields[0]
        status = fields[2]
        pods.append((pod_name, status))
    return pods


def get_clusters(pods: list, cluster_name_prefix: str) -> set:
    """
    Get unique cluster name(s). Works for one or more clusters, based off of the number of head nodes.
    Excludes MLflow deployments.
    """
    clusters = set()
    for pod_name, _ in pods:
        # Skip MLflow pods
        if "-mlflow" in pod_name:
            continue

        match = re.match(r"(" + re.escape(cluster_name_prefix) + r"[-\w]+)", pod_name)
        if match:
            # Get base name without head/worker suffix
            base_name = match.group(1).split("-head")[0].split("-worker")[0]
            clusters.add(base_name)
    return sorted(clusters)


def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") -> str:
    """
    Get MLflow service information if it exists in the namespace with the given prefix.
    Only works for a single cluster instance.
    Args:
        namespace: Kubernetes namespace
        cluster_prefix: Base cluster name (without -head/-worker suffixes)
    Returns:
        MLflow service URL
    """
    # Strip any -head or -worker suffixes to get base name
    if namespace is None:
        namespace = get_namespace()
    pods = get_pods(namespace=namespace)
    clusters = get_clusters(pods=pods, cluster_name_prefix=cluster_prefix)
    if len(clusters) > 1:
        raise ValueError("More than one cluster matches prefix, could not automatically determine mlflow info.")

    base_name = cluster_prefix.split("-head")[0].split("-worker")[0]
    mlflow_name = f"{base_name}-mlflow"

    cmd = ["kubectl", "get", "svc", mlflow_name, "-n", namespace, "--no-headers"]
    try:
        output = subprocess.check_output(cmd).decode()
        fields = output.strip().split()

        # Get cluster IP
        cluster_ip = fields[2]
        port = "5000"  # Default MLflow port

        return f"http://{cluster_ip}:{port}"
    except subprocess.CalledProcessError as e:
        raise ValueError(f"Could not grok MLflow: {e}")  # Fixed f-string


def check_clusters_running(pods: list, clusters: set) -> bool:
    """
    Check that all of the pods in all provided clusters are running.

    Args:
        pods (list): A list of tuples where each tuple contains the pod name and its status.
        clusters (set): A set of cluster names to check.

    Returns:
        bool: True if all pods in any of the clusters are running, False otherwise.
    """
    clusters_running = False
    for cluster in clusters:
        cluster_pods = [p for p in pods if p[0].startswith(cluster)]
        total_pods = len(cluster_pods)
        running_pods = len([p for p in cluster_pods if p[1] == "Running"])
        if running_pods == total_pods and running_pods > 0:
            clusters_running = True
            break
    return clusters_running


def get_ray_address(head_pod: str, namespace: str = "default", ray_head_name: str = "head") -> str:
    """
    Given a cluster head pod, check its logs, which should include the ray address which can accept job requests.

    Args:
        head_pod (str): The name of the head pod.
        namespace (str, optional): The Kubernetes namespace. Defaults to "default".
        ray_head_name (str, optional): The name of the ray head container. Defaults to "head".

    Returns:
        str: The ray address if found, None otherwise.

    Raises:
        ValueError: If the logs cannot be retrieved or the ray address is not found.
    """
    cmd = ["kubectl", "logs", head_pod, "-c", ray_head_name, "-n", namespace]
    try:
        output = subprocess.check_output(cmd).decode()
    except subprocess.CalledProcessError as e:
        raise ValueError(
            f"Could not enter head container with cmd {cmd}: {e}Perhaps try a different namespace or ray head name."
        )
    match = re.search(r"RAY_ADDRESS='([^']+)'", output)
    if match:
        return match.group(1)
    else:
        return None


def process_cluster(cluster_info: dict, ray_head_name: str = "head") -> str:
    """
    For each cluster, check that it is running, and get the Ray head address that will accept jobs.

    Args:
        cluster_info (dict): A dictionary containing cluster information with keys 'cluster', 'pods', and 'namespace'.
        ray_head_name (str, optional): The name of the ray head container. Defaults to "head".

    Returns:
        str: A string containing the cluster name and its Ray head address, or an error message if the head pod or Ray address is not found.
    """
    cluster, pods, namespace = cluster_info
    head_pod = None
    for pod_name, status in pods:
        if pod_name.startswith(cluster + "-head"):
            head_pod = pod_name
            break
    if not head_pod:
        return f"Error: Could not find head pod for cluster {cluster}\n"

    # Get RAY_ADDRESS and status
    ray_address = get_ray_address(head_pod, namespace=namespace, ray_head_name=ray_head_name)
    if not ray_address:
        return f"Error: Could not find RAY_ADDRESS for cluster {cluster}\n"

    # Return only cluster and ray address
    return f"name: {cluster} address: {ray_address}\n"


def main():
    # Parse command-line arguments
    parser = argparse.ArgumentParser(description="Process Ray clusters and save their specifications.")
    parser.add_argument("--prefix", default="isaacray", help="The prefix for the cluster names.")
    parser.add_argument("--output", default="~/.cluster_config", help="The file to save cluster specifications.")
    parser.add_argument("--ray_head_name", default="head", help="The metadata name for the ray head container")
    parser.add_argument(
        "--namespace", help="Kubernetes namespace to use. If not provided, will detect from current context."
    )
    args = parser.parse_args()

    # Get namespace from args or detect it
    current_namespace = args.namespace if args.namespace else get_namespace()
    print(f"Using namespace: {current_namespace}")

    cluster_name_prefix = args.prefix
    cluster_spec_file = os.path.expanduser(args.output)

    # Get all pods
    pods = get_pods(namespace=current_namespace)

    # Get clusters
    clusters = get_clusters(pods, cluster_name_prefix)
    if not clusters:
        print(f"No clusters found with prefix {cluster_name_prefix}")
        return

    # Wait for clusters to be running
    while True:
        pods = get_pods(namespace=current_namespace)
        if check_clusters_running(pods, clusters):
            break
        print("Waiting for all clusters to spin up...")
        time.sleep(5)

    print("Checking for MLflow:")
    # Check MLflow status for each cluster
    for cluster in clusters:
        try:
            mlflow_address = get_mlflow_info(current_namespace, cluster)
            print(f"MLflow address for {cluster}: {mlflow_address}")
        except ValueError as e:
            print(f"ML Flow not located: {e}")
    print()

    # Prepare cluster info for parallel processing
    cluster_infos = []
    for cluster in clusters:
        cluster_pods = [p for p in pods if p[0].startswith(cluster)]
        cluster_infos.append((cluster, cluster_pods, current_namespace))

    # Use ThreadPoolExecutor to process clusters in parallel
    results = []
    results_lock = threading.Lock()

    with ThreadPoolExecutor() as executor:
        future_to_cluster = {
            executor.submit(process_cluster, info, args.ray_head_name): info[0] for info in cluster_infos
        }
        for future in as_completed(future_to_cluster):
            cluster_name = future_to_cluster[future]
            try:
                result = future.result()
                with results_lock:
                    results.append(result)
            except Exception as exc:
                print(f"{cluster_name} generated an exception: {exc}")

    # Sort results alphabetically by cluster name
    results.sort()

    # Write sorted results to the output file (Ray info only)
    with open(cluster_spec_file, "w") as f:
        for result in results:
            f.write(result)

    print(f"Cluster spec information saved to {cluster_spec_file}")
    # Display the contents of the config file
    with open(cluster_spec_file) as f:
        print(f.read())


if __name__ == "__main__":
    main()

Ray Specific#

1.) Verify cluster access.

2.) Create a ~/.cluster_config file, where name: <NAME> address: http://<IP>:<PORT> is on a new line for each unique cluster. For one cluster, there should only be one line in this file.

3.) Start an MLFLow Server to receive the logs that the ray cluster has access to, and determine the server URI.

Shared Steps Between KubeRay and Pure Ray Part II#

1.) Test that your cluster is operational with the following.

# Test that NVIDIA GPUs are visible and that Ray is operation with the following command:
./isaaclab.sh -p source/standalone/workflows/ray/wrap_resources.py
      --jobs wrap_resources.py --test

2.) Submitting Jobs can be done in the following manner, with the following script.

source/standalone/workflows/ray/submit_job.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import os
import time
from concurrent.futures import ThreadPoolExecutor

from ray import job_submission

"""
This script submits aggregate job(s) to cluster(s) described in a
config file containing ``name: <NAME> address: http://<IP>:<PORT>`` on
a new line for each cluster. For KubeRay clusters, this file
can be automatically created with :file:`grok_cluster_with_kubectl.py`

Aggregate job(s) are matched with cluster(s) via the following relation:
cluster_line_index_submitted_to = job_index % total_cluster_count

Aggregate jobs are separated by the * delimiter. The ``--aggregate_jobs`` argument must be
the last argument supplied to the script.

An aggregate job could be a :file:`../tuner.py` tuning job, which automatically
creates several individual jobs when started on a cluster. Alternatively, an aggregate job
could be a :file:'../wrap_resources.py` resource-wrapped job,
which may contain several individual sub-jobs separated by
the + delimiter.

If there are more aggregate jobs than cluster(s), aggregate jobs will be submitted
as clusters become available via the defined relation above. If there are less aggregate job(s)
than clusters, some clusters will not receive aggregate job(s). The maximum number of
aggregate jobs that can be run simultaneously is equal to the number of workers created by
default by a ThreadPoolExecutor on the machine submitting jobs due to fetching the log output after
jobs finish, which is unlikely to constrain overall-job submission.

Usage:

.. code-block:: bash

    # Example; submitting a tuning job
    ./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py \
    --aggregate_jobs /workspace/isaaclab/source/standalone/workflows/ray/tuner.py \
        --cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
        --cfg_class CartpoleRGBNoTuneJobCfg --mlflow_uri <ML_FLOW_URI>

    # Example: Submitting resource wrapped job
    ./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py --aggregate_jobs wrap_resources.py --sub_jobs ./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task Isaac-Cartpole-v0 --headless+./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task Isaac-Cartpole-RGB-Camera-Direct-v0 --headless --enable_cameras agent.params.config.max_epochs=150

    # For all command line arguments
    ./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py -h
"""
script_directory = os.path.dirname(os.path.abspath(__file__))
CONFIG = {"working_dir": script_directory, "executable": "/workspace/isaaclab/isaaclab.sh -p"}


def read_cluster_spec(fn: str | None = None) -> list[dict]:
    if fn is None:
        cluster_spec_path = os.path.expanduser("~/.cluster_config")
    else:
        cluster_spec_path = os.path.expanduser(fn)

    if not os.path.exists(cluster_spec_path):
        raise FileNotFoundError(f"Cluster spec file not found at {cluster_spec_path}")

    clusters = []
    with open(cluster_spec_path) as f:
        for line in f:
            parts = line.strip().split(" ")
            http_address = parts[3]
            cluster_info = {"name": parts[1], "address": http_address}
            print(f"[INFO] Setting {cluster_info['name']}")  # with {cluster_info['num_gpu']} GPUs.")
            clusters.append(cluster_info)

    return clusters


def submit_job(cluster: dict, job_command: str) -> None:
    """
    Submits a job to a single cluster, prints the final result and Ray dashboard URL at the end.
    """
    address = cluster["address"]
    cluster_name = cluster["name"]
    print(f"[INFO]: Submitting job to cluster '{cluster_name}' at {address}")  # with {num_gpus} GPUs.")
    client = job_submission.JobSubmissionClient(address)
    runtime_env = {"working_dir": CONFIG["working_dir"], "executable": CONFIG["executable"]}
    print(f"[INFO]: Checking contents of the directory: {CONFIG['working_dir']}")
    try:
        dir_contents = os.listdir(CONFIG["working_dir"])
        print(f"[INFO]: Directory contents: {dir_contents}")
    except Exception as e:
        print(f"[INFO]: Failed to list directory contents: {str(e)}")
    entrypoint = f"{CONFIG['executable']} {job_command}"
    print(f"[INFO]: Attempting entrypoint {entrypoint=} in cluster {cluster}")
    job_id = client.submit_job(entrypoint=entrypoint, runtime_env=runtime_env)
    status = client.get_job_status(job_id)
    while status in [job_submission.JobStatus.PENDING, job_submission.JobStatus.RUNNING]:
        time.sleep(5)
        status = client.get_job_status(job_id)

    final_logs = client.get_job_logs(job_id)
    print("----------------------------------------------------")
    print(f"[INFO]: Cluster {cluster_name} Logs: \n")
    print(final_logs)
    print("----------------------------------------------------")


def submit_jobs_to_clusters(jobs: list[str], clusters: list[dict]) -> None:
    """
    Submit all jobs to their respective clusters, cycling through clusters if there are more jobs than clusters.
    """
    if not clusters:
        raise ValueError("No clusters available for job submission.")

    if len(jobs) < len(clusters):
        print("[INFO]: Less jobs than clusters, some clusters will not receive jobs")
    elif len(jobs) == len(clusters):
        print("[INFO]: Exactly one job per cluster")
    else:
        print("[INFO]: More jobs than clusters, jobs submitted as clusters become available.")
    with ThreadPoolExecutor() as executor:
        for idx, job_command in enumerate(jobs):
            # Cycle through clusters using modulus to wrap around if there are more jobs than clusters
            cluster = clusters[idx % len(clusters)]
            executor.submit(submit_job, cluster, job_command)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Submit multiple GPU jobs to multiple Ray clusters.")
    parser.add_argument("--config_file", default="~/.cluster_config", help="The cluster config path.")
    parser.add_argument(
        "--aggregate_jobs",
        type=str,
        nargs=argparse.REMAINDER,
        help="This should be last argument. The aggregate jobs to submit separated by the * delimiter.",
    )
    args = parser.parse_args()
    if args.aggregate_jobs is not None:
        jobs = " ".join(args.aggregate_jobs)
        formatted_jobs = jobs.split("*")
        if len(formatted_jobs) > 1:
            print("Warning; Split jobs by cluster with the * delimiter")
    else:
        formatted_jobs = []
    print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}")
    clusters = read_cluster_spec(args.config_file)
    submit_jobs_to_clusters(formatted_jobs, clusters)

3.) For tuning jobs, specify the hyperparameter sweep similar to RLGamesCameraJobCfg in the following file:

source/standalone/workflows/ray/tuner.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
import importlib.util
import os
import sys
from time import sleep

import ray
import util
from ray import air, tune
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.repeater import Repeater

"""
This script breaks down an aggregate tuning job, as defined by a hyperparameter sweep configuration,
into individual jobs (shell commands) to run on the GPU-enabled nodes of the cluster.
By default, (unless combined as a sub-job in a resource-wrapped aggregate job), one worker is created
for each GPU-enabled node in the cluster for each individual job.

Each hyperparameter sweep configuration should include the workflow,
runner arguments, and hydra arguments to vary.

This assumes that all workers in a cluster are homogeneous. For heterogeneous workloads,
create several heterogeneous clusters (with homogeneous nodes in each cluster),
then submit several overall-cluster jobs with :file:`../submit_job.py`.
KubeRay clusters on Google GKE can be created with :file:`../launch.py`

To report tune metrics on clusters, a running MLFlow server with a known URI that the cluster has
access to is required. For KubeRay clusters configured with :file:`../launch.py`, this is included
automatically, and can be easily found with with :file:`grok_cluster_with_kubectl.py`

Usage:

.. code-block:: bash

    ./isaaclab.sh -p source/standalone/workflows/ray/tuner.py -h

    # Examples
    # Local (not within a docker container, when within a local docker container, do not supply run_mode argument)
    ./isaaclab.sh -p source/standalone/workflows/ray/tuner.py --run_mode local \
    --cfg_file source/standalone/workflows/ray/hyperparameter_tuning/vision_cartpole_cfg.py \
    --cfg_class CartpoleRGBNoTuneJobCfg
    # Local docker: start the ray server and run above command in the same running container without run_mode arg
    # Remote (run grok cluster or create config file mentioned in :file:`submit_job.py`)
    ./isaaclab.sh -p source/standalone/workflows/ray/submit_job.py \
    --aggregate_jobs tuner.py \
    --cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
    --cfg_class CartpoleRGBNoTuneJobCfg --mlflow_uri <MLFLOW_URI_FROM_GROK_OR_MANUAL>

"""

DOCKER_PREFIX = "/workspace/isaaclab/"
BASE_DIR = os.path.expanduser("~")
PYTHON_EXEC = "./isaaclab.sh -p"
WORKFLOW = "source/standalone/workflows/rl_games/train.py"
NUM_WORKERS_PER_NODE = 1  # needed for local parallelism


class IsaacLabTuneTrainable(tune.Trainable):
    """The Isaac Lab Ray Tune Trainable.
    This class uses the standalone workflows to start jobs, along with the hydra integration.
    This class achieves Ray-based logging through reading the tensorboard logs from
    the standalone workflows. This depends on a config generated in the format of
    :class:`JobCfg`
    """

    def setup(self, config: dict) -> None:
        """Get the invocation command, return quick for easy scheduling."""
        self.data = None
        self.invoke_cmd = util.get_invocation_command_from_cfg(cfg=config, python_cmd=PYTHON_EXEC, workflow=WORKFLOW)
        print(f"[INFO]: Recovered invocation with {self.invoke_cmd}")
        self.experiment = None

    def reset_config(self, new_config):
        """Allow environments to be re-used by fetching a new invocation command"""
        self.setup(new_config)
        return True

    def step(self) -> dict:
        if self.experiment is None:  # start experiment
            # When including this as first step instead of setup, experiments get scheduled faster
            # Don't want to block the scheduler while the experiment spins up
            print(f"[INFO]: Invoking experiment as first step with {self.invoke_cmd}...")
            experiment = util.execute_job(
                self.invoke_cmd,
                identifier_string="",
                extract_experiment=True,
                persistent_dir=BASE_DIR,
            )
            self.experiment = experiment
            print(f"[INFO]: Tuner recovered experiment info {experiment}")
            self.proc = experiment["proc"]
            self.experiment_name = experiment["experiment_name"]
            self.isaac_logdir = experiment["logdir"]
            self.tensorboard_logdir = self.isaac_logdir + f"/{self.experiment_name}/summaries"
            self.done = False

        if self.proc is None:
            raise ValueError("Could not start trial.")

        if self.proc.poll() is not None:  # process finished, signal finish
            self.data["done"] = True
            print("[INFO]: Process finished, returning...")
        else:  # wait until the logs are ready or fresh
            data = util.load_tensorboard_logs(self.tensorboard_logdir)

            while data is None:
                data = util.load_tensorboard_logs(self.tensorboard_logdir)
                sleep(2)  # Lazy report metrics to avoid performance overhead

            if self.data is not None:
                while util._dicts_equal(data, self.data):
                    data = util.load_tensorboard_logs(self.tensorboard_logdir)
                    sleep(2)  # Lazy report metrics to avoid performance overhead

            self.data = data
            self.data["done"] = False
        return self.data

    def default_resource_request(self):
        """How many resources each trainable uses. Assumes homogeneous resources across gpu nodes,
        and that each trainable is meant for one node, where it uses all available resources."""
        resources = util.get_gpu_node_resources(one_node_only=True)
        if NUM_WORKERS_PER_NODE != 1:
            print("[WARNING]: Splitting node into more than one worker")
        return tune.PlacementGroupFactory(
            [{"CPU": resources["CPU"] / NUM_WORKERS_PER_NODE, "GPU": resources["GPU"] / NUM_WORKERS_PER_NODE}],
            strategy="STRICT_PACK",
        )


def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None:
    """Invoke an Isaac-Ray tuning run.

    Log either to a local directory or to MLFlow.
    Args:
        cfg: Configuration dictionary extracted from job setup
        args: Command-line arguments related to tuning.
    """
    # Allow for early exit
    os.environ["TUNE_DISABLE_STRICT_METRIC_CHECKING"] = "1"

    print("[WARNING]: Not saving checkpoints, just running experiment...")
    print("[INFO]: Model parameters and metrics will be preserved.")
    print("[WARNING]: For homogeneous cluster resources only...")
    # Get available resources
    resources = util.get_gpu_node_resources()
    print(f"[INFO]: Available resources {resources}")

    if not ray.is_initialized():
        ray.init(
            address=args.ray_address,
            log_to_driver=True,
            num_gpus=len(resources),
        )

    print(f"[INFO]: Using config {cfg}")

    # Configure the search algorithm and the repeater
    searcher = OptunaSearch(
        metric=args.metric,
        mode=args.mode,
    )
    repeat_search = Repeater(searcher, repeat=args.repeat_run_count)

    if args.run_mode == "local":  # Standard config, to file
        run_config = air.RunConfig(
            storage_path="/tmp/ray",
            name=f"IsaacRay-{args.cfg_class}-tune",
            verbose=1,
            checkpoint_config=air.CheckpointConfig(
                checkpoint_frequency=0,  # Disable periodic checkpointing
                checkpoint_at_end=False,  # Disable final checkpoint
            ),
        )

    elif args.run_mode == "remote":  # MLFlow, to MLFlow server
        mlflow_callback = MLflowLoggerCallback(
            tracking_uri=args.mlflow_uri,
            experiment_name=f"IsaacRay-{args.cfg_class}-tune",
            save_artifact=False,
            tags={"run_mode": "remote", "cfg_class": args.cfg_class},
        )

        run_config = ray.train.RunConfig(
            name="mlflow",
            storage_path="/tmp/ray",
            callbacks=[mlflow_callback],
            checkpoint_config=ray.train.CheckpointConfig(checkpoint_frequency=0, checkpoint_at_end=False),
        )
    else:
        raise ValueError("Unrecognized run mode.")

    # Configure the tuning job
    tuner = tune.Tuner(
        IsaacLabTuneTrainable,
        param_space=cfg,
        tune_config=tune.TuneConfig(
            search_alg=repeat_search,
            num_samples=args.num_samples,
            reuse_actors=True,
        ),
        run_config=run_config,
    )

    # Execute the tuning
    tuner.fit()

    # Save results to mounted volume
    if args.run_mode == "local":
        print("[DONE!]: Check results with tensorboard dashboard")
    else:
        print("[DONE!]: Check results with MLFlow dashboard")


class JobCfg:
    """To be compatible with :meth: invoke_tuning_run and :class:IsaacLabTuneTrainable,
    at a minimum, the tune job should inherit from this class."""

    def __init__(self, cfg):
        assert "runner_args" in cfg, "No runner arguments specified."
        assert "--task" in cfg["runner_args"], "No task specified."
        assert "hydra_args" in cfg, "No hypeparameters specified."
        self.cfg = cfg


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Tune Isaac Lab hyperparameters.")
    parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.")
    parser.add_argument(
        "--cfg_file",
        type=str,
        default="hyperparameter_tuning/vision_cartpole_cfg.py",
        required=False,
        help="The relative filepath where a hyperparameter sweep is defined",
    )
    parser.add_argument(
        "--cfg_class",
        type=str,
        default="CartpoleRGBNoTuneJobCfg",
        required=False,
        help="Name of the hyperparameter sweep class to use",
    )
    parser.add_argument(
        "--run_mode",
        choices=["local", "remote"],
        default="remote",
        help=(
            "Set to local to use ./isaaclab.sh -p python, set to "
            "remote to use /workspace/isaaclab/isaaclab.sh -p python"
        ),
    )
    parser.add_argument(
        "--workflow",
        default=None,  # populated with RL Games
        help="The absolute path of the workflow to use for the experiment. By default, RL Games is used.",
    )
    parser.add_argument(
        "--mlflow_uri",
        type=str,
        default=None,
        required=False,
        help="The MLFlow Uri.",
    )
    parser.add_argument(
        "--num_workers_per_node",
        type=int,
        default=1,
        help="Number of workers to run on each GPU node. Only supply for parallelism on multi-gpu nodes",
    )

    parser.add_argument("--metric", type=str, default="rewards/time", help="What metric to tune for.")

    parser.add_argument(
        "--mode",
        choices=["max", "min"],
        default="max",
        help="What to optimize the metric to while tuning",
    )
    parser.add_argument(
        "--num_samples",
        type=int,
        default=100,
        help="How many hyperparameter runs to try total.",
    )
    parser.add_argument(
        "--repeat_run_count",
        type=int,
        default=3,
        help="How many times to repeat each hyperparameter config.",
    )

    args = parser.parse_args()
    NUM_WORKERS_PER_NODE = args.num_workers_per_node
    print(f"[INFO]: Using {NUM_WORKERS_PER_NODE} workers per node.")
    if args.run_mode == "remote":
        BASE_DIR = DOCKER_PREFIX  # ensure logs are dumped to persistent location
        PYTHON_EXEC = DOCKER_PREFIX + PYTHON_EXEC[2:]
        if args.workflow is None:
            WORKFLOW = DOCKER_PREFIX + WORKFLOW
        else:
            WORKFLOW = args.workflow
        print(f"[INFO]: Using remote mode {PYTHON_EXEC=} {WORKFLOW=}")

        if args.mlflow_uri is not None:
            import mlflow

            mlflow.set_tracking_uri(args.mlflow_uri)
            from ray.air.integrations.mlflow import MLflowLoggerCallback
        else:
            raise ValueError("Please provide a result MLFLow URI server.")
    else:  # local
        PYTHON_EXEC = os.getcwd() + "/" + PYTHON_EXEC[2:]
        if args.workflow is None:
            WORKFLOW = os.getcwd() + "/" + WORKFLOW
        else:
            WORKFLOW = args.workflow
        BASE_DIR = os.getcwd()
        print(f"[INFO]: Using local mode {PYTHON_EXEC=} {WORKFLOW=}")
    file_path = args.cfg_file
    class_name = args.cfg_class
    print(f"[INFO]: Attempting to use sweep config from {file_path=} {class_name=}")
    module_name = os.path.splitext(os.path.basename(file_path))[0]

    spec = importlib.util.spec_from_file_location(module_name, file_path)
    module = importlib.util.module_from_spec(spec)
    sys.modules[module_name] = module
    spec.loader.exec_module(module)
    print(f"[INFO]: Successfully imported {module_name} from {file_path}")
    if hasattr(module, class_name):
        ClassToInstantiate = getattr(module, class_name)
        print(f"[INFO]: Found correct class {ClassToInstantiate}")
        instance = ClassToInstantiate()
        print(f"[INFO]: Successfully instantiated class '{class_name}' from {file_path}")
        cfg = instance.cfg
        print(f"[INFO]: Grabbed the following hyperparameter sweep config: \n {cfg}")
        invoke_tuning_run(cfg, args)

    else:
        raise AttributeError(f"[ERROR]:Class '{class_name}' not found in {file_path}")

For example, see the Cartpole Example configurations.

source/standalone/workflows/ray/hyperparameter_tuning/vision_cartpole_cfg.py
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import pathlib
import sys

# Allow for import of items from the ray workflow.
CUR_DIR = pathlib.Path(__file__).parent
UTIL_DIR = CUR_DIR.parent
sys.path.extend([str(UTIL_DIR), str(CUR_DIR)])
import util
import vision_cfg
from ray import tune


class CartpoleRGBNoTuneJobCfg(vision_cfg.CameraJobCfg):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
        super().__init__(cfg, vary_env_count=False, vary_cnn=False, vary_mlp=False)


class CartpoleRGBCNNOnlyJobCfg(vision_cfg.CameraJobCfg):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
        super().__init__(cfg, vary_env_count=False, vary_cnn=True, vary_mlp=False)


class CartpoleRGBJobCfg(vision_cfg.CameraJobCfg):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
        super().__init__(cfg, vary_env_count=True, vary_cnn=True, vary_mlp=True)


class CartpoleResNetJobCfg(vision_cfg.ResNetCameraJob):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-ResNet18-v0"])
        super().__init__(cfg)


class CartpoleTheiaJobCfg(vision_cfg.TheiaCameraJob):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"])
        super().__init__(cfg)

Tuning jobs can also be submitted via submit_job.py

To view the tuning results, view the MLFlow dashboard of the server that you created. For KubeRay, this can be done through port forwarding the MLFlow dashboard, with

kubectl port-forward service/isaacray-mlflow 5000:5000

and visiting the following address in a browser.

localhost:5000

If the MLFlow port is forwarded like above, it can be converted into tensorboard logs with this following command.

./isaaclab.sh -p source/standalone/workflows/ray/mlflow_to_local_tensorboard.py \ --uri http://localhost:5000 --experiment-name IsaacRay-<CLASS_JOB_CFG>-tune --download-dir test

Cluster Cleanup#

For the sake of conserving resources, and potentially freeing precious GPU resources for other people to use on shared compute platforms, please destroy the Ray cluster after use. They can be easily recreated! For KubeRay clusters, this can be done as follows.

kubectl get raycluster | egrep 'isaacray' | awk '{print $1}' | xargs kubectl delete raycluster &&
kubectl get deployments | egrep 'mlflow' | awk '{print $1}' | xargs kubectl delete deployment &&
kubectl get services | egrep 'mlflow' | awk '{print $1}' | xargs kubectl delete service