Source code for omni.isaac.lab.sim.converters.asset_converter_base
# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import abc
import hashlib
import json
import os
import pathlib
import random
from datetime import datetime
from omni.isaac.lab.sim.converters.asset_converter_base_cfg import AssetConverterBaseCfg
from omni.isaac.lab.utils.assets import check_file_path
from omni.isaac.lab.utils.io import dump_yaml
[docs]class AssetConverterBase(abc.ABC):
"""Base class for converting an asset file from different formats into USD format.
This class provides a common interface for converting an asset file into USD. It does not
provide any implementation for the conversion. The derived classes must implement the
:meth:`_convert_asset` method to provide the actual conversion.
The file conversion is lazy if the output directory (:obj:`AssetConverterBaseCfg.usd_dir`) is provided.
In the lazy conversion, the USD file is re-generated only if:
* The asset file is modified.
* The configuration parameters are modified.
* The USD file does not exist.
To override this behavior to force conversion, the flag :obj:`AssetConverterBaseCfg.force_usd_conversion`
can be set to True.
When no output directory is defined, lazy conversion is deactivated and the generated USD file is
stored in folder ``/tmp/IsaacLab/usd_{date}_{time}_{random}``, where the parameters in braces are generated
at runtime. The random identifiers help avoid a race condition where two simultaneously triggered conversions
try to use the same directory for reading/writing the generated files.
.. note::
Changes to the parameters :obj:`AssetConverterBaseCfg.asset_path`, :obj:`AssetConverterBaseCfg.usd_dir`, and
:obj:`AssetConverterBaseCfg.usd_file_name` are not considered as modifications in the configuration instance that
trigger USD file re-generation.
"""
[docs] def __init__(self, cfg: AssetConverterBaseCfg):
"""Initializes the class.
Args:
cfg: The configuration instance for converting an asset file to USD format.
Raises:
ValueError: When provided asset file does not exist.
"""
# check if the asset file exists
if not check_file_path(cfg.asset_path):
raise ValueError(f"The asset path does not exist: {cfg.asset_path}")
# save the inputs
self.cfg = cfg
# resolve USD directory name
if cfg.usd_dir is None:
# a folder in "/tmp/IsaacLab" by the name: usd_{date}_{time}_{random}
time_tag = datetime.now().strftime("%Y%m%d_%H%M%S")
self._usd_dir = f"/tmp/IsaacLab/usd_{time_tag}_{random.randrange(10000)}"
else:
self._usd_dir = cfg.usd_dir
# resolve the file name from asset file name if not provided
if cfg.usd_file_name is None:
usd_file_name = pathlib.PurePath(cfg.asset_path).stem
else:
usd_file_name = cfg.usd_file_name
# add USD extension if not provided
if not (usd_file_name.endswith(".usd") or usd_file_name.endswith(".usda")):
self._usd_file_name = usd_file_name + ".usd"
else:
self._usd_file_name = usd_file_name
# create the USD directory
os.makedirs(self.usd_dir, exist_ok=True)
# check if usd files exist
self._usd_file_exists = os.path.isfile(self.usd_path)
# path to read/write asset hash file
self._dest_hash_path = os.path.join(self.usd_dir, ".asset_hash")
# create asset hash to check if the asset has changed
self._asset_hash = self._config_to_hash(cfg)
# read the saved hash
try:
with open(self._dest_hash_path) as f:
existing_asset_hash = f.readline()
self._is_same_asset = existing_asset_hash == self._asset_hash
except FileNotFoundError:
self._is_same_asset = False
# convert the asset to USD if the hash is different or USD file does not exist
if cfg.force_usd_conversion or not self._usd_file_exists or not self._is_same_asset:
# write the updated hash
with open(self._dest_hash_path, "w") as f:
f.write(self._asset_hash)
# convert the asset to USD
self._convert_asset(cfg)
# dump the configuration to a file
dump_yaml(os.path.join(self.usd_dir, "config.yaml"), cfg.to_dict())
# add comment to top of the saved config file with information about the converter
current_date = datetime.now().strftime("%Y-%m-%d")
current_time = datetime.now().strftime("%H:%M:%S")
generation_comment = (
f"##\n# Generated by {self.__class__.__name__} on {current_date} at {current_time}.\n##\n"
)
with open(os.path.join(self.usd_dir, "config.yaml"), "a") as f:
f.write(generation_comment)
"""
Properties.
"""
@property
def usd_dir(self) -> str:
"""The absolute path to the directory where the generated USD files are stored."""
return self._usd_dir
@property
def usd_file_name(self) -> str:
"""The file name of the generated USD file."""
return self._usd_file_name
@property
def usd_path(self) -> str:
"""The absolute path to the generated USD file."""
return os.path.join(self.usd_dir, self.usd_file_name)
@property
def usd_instanceable_meshes_path(self) -> str:
"""The relative path to the USD file with meshes.
The path is with respect to the USD directory :attr:`usd_dir`. This is to ensure that the
mesh references in the generated USD file are resolved relatively. Otherwise, it becomes
difficult to move the USD asset to a different location.
"""
return os.path.join(".", "Props", "instanceable_meshes.usd")
"""
Implementation specifics.
"""
@abc.abstractmethod
def _convert_asset(self, cfg: AssetConverterBaseCfg):
"""Converts the asset file to USD.
Args:
cfg: The configuration instance for the input asset to USD conversion.
"""
raise NotImplementedError()
"""
Private helpers.
"""
@staticmethod
def _config_to_hash(cfg: AssetConverterBaseCfg) -> str:
"""Converts the configuration object and asset file to an MD5 hash of a string.
.. warning::
It only checks the main asset file (:attr:`cfg.asset_path`).
Args:
config : The asset converter configuration object.
Returns:
An MD5 hash of a string.
"""
# convert to dict and remove path related info
config_dic = cfg.to_dict()
_ = config_dic.pop("asset_path")
_ = config_dic.pop("usd_dir")
_ = config_dic.pop("usd_file_name")
# convert config dic to bytes
config_bytes = json.dumps(config_dic).encode()
# hash config
md5 = hashlib.md5()
md5.update(config_bytes)
# read the asset file to observe changes
with open(cfg.asset_path, "rb") as f:
while True:
# read 64kb chunks to avoid memory issues for the large files!
data = f.read(65536)
if not data:
break
md5.update(data)
# return the hash
return md5.hexdigest()