diff --git a/configs/gym/agent/pour_water_agent/fast_gym_config.json b/configs/gym/agent/pour_water_agent/fast_gym_config.json index 15c011ec..e56d77b1 100644 --- a/configs/gym/agent/pour_water_agent/fast_gym_config.json +++ b/configs/gym/agent/pour_water_agent/fast_gym_config.json @@ -251,14 +251,8 @@ "mode": "save", "params": { "robot_meta": { - "arm_dofs": 12, "control_freq": 25, - "control_parts": ["left_arm", "left_eef", "right_arm", "right_eef"], - "observation": { - "vision": {}, - "states": ["qpos"] - }, - "min_len_steps": 5 + "control_parts": ["left_arm", "left_eef", "right_arm", "right_eef"] }, "instruction": { "lang": "Pour water from the bottle into the mug." diff --git a/configs/gym/agent/rearrangement_agent/fast_gym_config.json b/configs/gym/agent/rearrangement_agent/fast_gym_config.json index 0d03ba65..ec94fb1f 100644 --- a/configs/gym/agent/rearrangement_agent/fast_gym_config.json +++ b/configs/gym/agent/rearrangement_agent/fast_gym_config.json @@ -236,14 +236,8 @@ "mode": "save", "params": { "robot_meta": { - "arm_dofs": 12, "control_freq": 25, - "control_parts": ["left_arm", "left_eef", "right_arm", "right_eef"], - "observation": { - "vision": {}, - "states": ["qpos"] - }, - "min_len_steps": 125 + "control_parts": ["left_arm", "left_eef", "right_arm", "right_eef"] }, "instruction": { "lang": "Place the spoon and fork neatly into the plate on the table." diff --git a/configs/gym/pour_water/gym_config.json b/configs/gym/pour_water/gym_config.json index 3e0bcafb..840c3726 100644 --- a/configs/gym/pour_water/gym_config.json +++ b/configs/gym/pour_water/gym_config.json @@ -264,20 +264,8 @@ "params": { "robot_meta": { "robot_type": "CobotMagic", - "arm_dofs": 12, "control_freq": 25, - "control_parts": ["left_arm", "left_eef", "right_arm", "right_eef"], - "observation": { - "vision": { - "cam_high": ["mask"], - "cam_right_wrist": ["mask"], - "cam_left_wrist": ["mask"] - }, - "states": ["qpos"], - "exteroception": ["cam_high", "cam_right_wrist", "cam_left_wrist"] - }, - "action": "qpos_with_eef_pose", - "min_len_steps": 5 + "control_parts": ["left_arm", "left_eef", "right_arm", "right_eef"] }, "instruction": { "lang": "Pour water from bottle to cup" diff --git a/configs/gym/pour_water/gym_config_simple.json b/configs/gym/pour_water/gym_config_simple.json index 3b709145..5cf1b217 100644 --- a/configs/gym/pour_water/gym_config_simple.json +++ b/configs/gym/pour_water/gym_config_simple.json @@ -213,18 +213,8 @@ "params": { "robot_meta": { "robot_type": "CobotMagic", - "arm_dofs": 12, "control_freq": 25, - "control_parts": ["left_arm", "left_eef", "right_arm", "right_eef"], - "observation": { - "vision": { - "cam_high": ["mask"] - }, - "states": ["qpos"], - "exteroception": ["cam_high", "cam_right_wrist", "cam_left_wrist"] - }, - "action": "qpos_with_eef_pose", - "min_len_steps": 5 + "control_parts": ["left_arm", "left_eef", "right_arm", "right_eef"] }, "instruction": { "lang": "Pour water from bottle to cup" diff --git a/configs/gym/special/simple_task_ur10.json b/configs/gym/special/simple_task_ur10.json index 8596d7a0..1c722b1a 100644 --- a/configs/gym/special/simple_task_ur10.json +++ b/configs/gym/special/simple_task_ur10.json @@ -33,16 +33,7 @@ "params": { "robot_meta": { "robot_type": "UR10", - "arm_dofs": 6, - "control_freq": 3, - "observation": { - "vision": { - "cam_high": [] - }, - "states": ["qpos"] - }, - "action": "qpos", - "min_len_steps": 5 + "control_freq": 3 }, "instruction": { "lang": "Acting with Oscillatory motion" diff --git a/embodichain/lab/gym/envs/base_env.py b/embodichain/lab/gym/envs/base_env.py index 99f4c211..a637dc7d 100644 --- a/embodichain/lab/gym/envs/base_env.py +++ b/embodichain/lab/gym/envs/base_env.py @@ -164,21 +164,15 @@ def device(self) -> torch.device: @cached_property def single_observation_space(self) -> gym.spaces.Space: - if self.num_envs == 1: - return gym_utils.convert_observation_to_space(self._init_raw_obs) - else: - return gym_utils.convert_observation_to_space( - self._init_raw_obs, unbatched=True - ) + return gym_utils.convert_observation_to_space( + self._init_raw_obs, unbatched=True + ) @cached_property def observation_space(self) -> gym.spaces.Space: - if self.num_envs == 1: - return self.single_observation_space - else: - return gym.vector.utils.batch_space( - self.single_observation_space, n=self.num_envs - ) + return gym_utils.convert_observation_to_space( + self._init_raw_obs, unbatched=False + ) @cached_property def flattened_observation_space(self) -> gym.spaces.Box: @@ -208,6 +202,11 @@ def action_space(self) -> gym.spaces.Space: def elapsed_steps(self) -> Union[int, torch.Tensor]: return self._elapsed_steps + @property + def has_sensors(self) -> bool: + """Return whether the environment has sensors.""" + return len(self.sensors) > 0 + def get_sensor(self, name: str, **kwargs) -> BaseSensor: """Get the sensor instance by name. @@ -576,10 +575,10 @@ def reset( self.sim.reset_objects_state( env_ids=reset_ids, excluded_uids=self._detached_uids_for_reset ) - self._elapsed_steps[reset_ids] = 0 # Reset hook for user to perform any custom reset logic. self._initialize_episode(reset_ids, **options) + self._elapsed_steps[reset_ids] = 0 return self.get_obs(**options), self.get_info(**options) diff --git a/embodichain/lab/gym/envs/embodied_env.py b/embodichain/lab/gym/envs/embodied_env.py index b412babb..5db51660 100644 --- a/embodichain/lab/gym/envs/embodied_env.py +++ b/embodichain/lab/gym/envs/embodied_env.py @@ -178,6 +178,9 @@ def __init__(self, cfg: EmbodiedEnvCfg, **kwargs): super().__init__(cfg, **kwargs) + if self.cfg.dataset and not self.cfg.filter_dataset_saving: + self.dataset_manager = DatasetManager(self.cfg.dataset, self) + self.episode_obs_buffer: Dict[int, List[EnvObs]] = { i: [] for i in range(self.num_envs) } @@ -208,9 +211,6 @@ def _init_sim_state(self, **kwargs): if self.cfg.rewards: self.reward_manager = RewardManager(self.cfg.rewards, self) - if self.cfg.dataset and not self.cfg.filter_dataset_saving: - self.dataset_manager = DatasetManager(self.cfg.dataset, self) - def _apply_functor_filter(self) -> None: """Apply functor filters to the environment components based on configuration. @@ -397,6 +397,7 @@ def _initialize_episode( ] if successful_env_ids: + # Convert back to tensor if needed successful_env_ids_tensor = torch.tensor( successful_env_ids, device=self.device @@ -405,8 +406,6 @@ def _initialize_episode( mode="save", env_ids=successful_env_ids_tensor, ) - else: - logger.log_warning("No successful episodes to save.") # Clear episode buffers and reset success status for environments being reset for env_id in env_ids_to_process: diff --git a/embodichain/lab/gym/envs/managers/datasets.py b/embodichain/lab/gym/envs/managers/datasets.py index 70b62aa7..b3326236 100644 --- a/embodichain/lab/gym/envs/managers/datasets.py +++ b/embodichain/lab/gym/envs/managers/datasets.py @@ -22,7 +22,9 @@ from typing import TYPE_CHECKING, Any, Dict, Optional, Union import numpy as np +import gymnasium as gym import torch +import tqdm from embodichain.utils import logger from embodichain.data.constants import EMBODICHAIN_DEFAULT_DATASET_ROOT @@ -87,6 +89,10 @@ def __init__(self, cfg: DatasetFunctorCfg, env: EmbodiedEnv): # Optional parameters self.instruction = params.get("instruction", None) self.extra = params.get("extra", {}) + + # Experimental parameters for extra episode info saving. + self.extra_episode_info = self.extra.get("episode_info", {}) + self.extra_episode_info_buffer = {} self.use_videos = params.get("use_videos", False) # LeRobot dataset instance @@ -165,10 +171,14 @@ def _save_episodes( episode_extra_info = extra_info.copy() self.total_time += current_episode_time episode_extra_info["total_time"] = self.total_time - self._update_dataset_info({"extra": episode_extra_info}) + self._save_extra_episode_meta_info(env_id) try: - for obs, action in zip(obs_list, action_list): + for obs, action in tqdm.tqdm( + zip(obs_list, action_list), + total=len(obs_list), + desc=f"Converting env {env_id} episode to LeRobot format", + ): frame = self._convert_frame_to_lerobot(obs, action, task) self.dataset.add_frame(frame) @@ -183,6 +193,28 @@ def _save_episodes( except Exception as e: logger.log_error(f"Failed to save episode {env_id}: {e}") + def _save_extra_episode_meta_info(self, env_id: int) -> None: + """Save extra episode meta info for a specific environment ID.""" + + curr_extra_episode_info = {} + if self.extra_episode_info: + for key, attr_list in self.extra_episode_info.items(): + if key == "rigid_object_physics_attributes": + rigid_obj_list = self._env.sim.get_rigid_object_uid_list() + for obj_uid in rigid_obj_list: + curr_extra_episode_info[obj_uid] = {} + obj = self._env.sim.get_rigid_object(obj_uid) + for attr in attr_list: + if attr == "mass": + curr_extra_episode_info[obj_uid]["mass"] = round( + obj.get_mass(env_ids=[env_id]).squeeze_().item(), 5 + ) + + self.extra_episode_info_buffer[self.curr_episode] = curr_extra_episode_info + self._update_dataset_info( + {"extra_episode_info": self.extra_episode_info_buffer} + ) + def finalize(self) -> Optional[str]: """Finalize the dataset.""" # Save any remaining episodes @@ -260,44 +292,99 @@ def _initialize_dataset(self) -> None: def _build_features(self) -> Dict: """Build LeRobot features dict.""" features = {} - extra_vision_config = self.robot_meta.get("observation", {}).get("vision", {}) - for camera_name in extra_vision_config.keys(): - sensor = self._env.get_sensor(camera_name) - is_stereo = is_stereocam(sensor) - img_shape = (sensor.cfg.height, sensor.cfg.width, 3) + # Setup robot joint state features based on control_parts or all joints if not specified. + control_parts = self.robot_meta.get("control_parts", None) + if control_parts is not None: + self._joint_ids = [] + for part in control_parts: + part_joint_ids = self._env.robot.get_joint_ids(part, remove_mimic=True) + self._joint_ids.extend(part_joint_ids) + else: + self._joint_ids = self._env.robot.get_joint_ids(remove_mimic=True) - features[camera_name] = { - "dtype": "video" if self.use_videos else "image", - "shape": img_shape, - "names": ["height", "width", "channel"], - } + state_dim = len(self._joint_ids) + # Create joint names. + joint_names = [self._env.robot.joint_names[i] for i in self._joint_ids] - if is_stereo: - features[get_right_name(camera_name)] = { - "dtype": "video" if self.use_videos else "image", - "shape": img_shape, - "names": ["height", "width", "channel"], - } - - qpos = self._env.robot.get_qpos() - state_dim = qpos.shape[1] - - if state_dim > 0: - features["observation.state"] = { - "dtype": "float32", - "shape": (state_dim,), - "names": ["state"], - } + features["observation.qpos"] = { + "dtype": "float32", + "shape": (state_dim,), + "names": joint_names, + } + features["observation.qvel"] = { + "dtype": "float32", + "shape": (state_dim,), + "names": joint_names, + } + features["observation.qf"] = { + "dtype": "float32", + "shape": (state_dim,), + "names": joint_names, + } # Use full qpos dimension for action (includes gripper) - action_dim = state_dim + action_dim = len(self._joint_ids) features["action"] = { "dtype": "float32", "shape": (action_dim,), - "names": ["action"], + "names": joint_names, } + # Setup sensor observation features based env.observation.sensor + if self._env.has_sensors: + sensor_obs_space: dict = self._env.single_observation_space["sensor"] + + for sensor_name, value in sensor_obs_space.items(): + sensor = self._env.get_sensor(sensor_name) + is_stereo = is_stereocam(sensor) + + for frame_name, space in value.items(): + # TODO: Support depth (uint16) and mask (also uint16 or uint8) + if frame_name not in ["color", "color_right"]: + logger.log_error( + f"Only support 'color' frame for vision sensors, but got '{frame_name}' in sensor '{sensor_name}'" + ) + + features[f"{sensor_name}.{frame_name}"] = { + "dtype": "video" if self.use_videos else "image", + "shape": (sensor.cfg.height, sensor.cfg.width, 3), + "names": ["height", "width", "channel"], + } + + if is_stereo: + features[f"{sensor_name}.{frame_name}_right"] = { + "dtype": "video" if self.use_videos else "image", + "shape": (sensor.cfg.height, sensor.cfg.width, 3), + "names": ["height", "width", "channel"], + } + + # TODO: The extra observation features are supposed to be defined in a flattened way in the observation space. + # Lerobot requires a flat feature dict, so we may need to support nested dicts to flatten dict conversion in the future. + # Add any extra features specified in observation space excluding 'robot' and 'sensor' + for key, space in self._env.single_observation_space.items(): + if key in ["robot", "sensor"]: + continue + + if isinstance(space, gym.spaces.Dict): + logger.log_warning( + f"Nested Dict observation space for key '{key}' is not directly supported. " + f"Please flatten it or specify features manually. Skipping '{key}'." + ) + continue + + names = key + if "vel" in key: + names = ["lin_x", "lin_y", "lin_z", "ang_x", "ang_y", "ang_z"] + elif "pose" in key: + names = ["x", "y", "z", "qw", "qx", "qy", "qz"] + + features[f"observation.{key}"] = { + "dtype": str(space.dtype), + "shape": space.shape, + "names": names, + } + return features def _convert_frame_to_lerobot( @@ -314,51 +401,40 @@ def _convert_frame_to_lerobot( Frame dict in LeRobot format with numpy arrays """ frame = {"task": task} - extra_vision_config = self.robot_meta.get("observation", {}).get("vision", {}) - - # Add images - for camera_name in extra_vision_config.keys(): - if camera_name in obs.get("sensor", {}): - sensor = self._env.get_sensor(camera_name) - is_stereo = is_stereocam(sensor) - color_data = obs["sensor"][camera_name]["color"] - if isinstance(color_data, torch.Tensor): - color_img = color_data[:, :, :3].cpu().numpy() - else: - color_img = np.array(color_data)[:, :, :3] + if self._env.has_sensors: + sensor_obs_space: dict = self._env.single_observation_space["sensor"] - if color_img.dtype in [np.float32, np.float64]: - color_img = (color_img * 255).astype(np.uint8) + # Add images + for sensor_name, value in sensor_obs_space.items(): + sensor = self._env.get_sensor(sensor_name) + is_stereo = is_stereocam(sensor) - frame[camera_name] = color_img + color_data = obs["sensor"][sensor_name]["color"] + color_img = color_data[:, :, :3].cpu() + frame[f"{sensor_name}.color"] = color_img if is_stereo: - color_right_data = obs["sensor"][camera_name]["color_right"] - if isinstance(color_right_data, torch.Tensor): - color_right_img = color_right_data[:, :, :3].cpu().numpy() - else: - color_right_img = np.array(color_right_data)[:, :, :3] - - if color_right_img.dtype in [np.float32, np.float64]: - color_right_img = (color_right_img * 255).astype(np.uint8) - - frame[get_right_name(camera_name)] = color_right_img + color_right_data = obs["sensor"][sensor_name]["color_right"] + color_right_img = color_right_data[:, :, :3].cpu() + frame[f"{sensor_name}.color_right"] = color_right_img # Add state - qpos = obs["robot"][JointType.QPOS.value] - if isinstance(qpos, torch.Tensor): - state_data = qpos.cpu().numpy().astype(np.float32) - else: - state_data = np.array(qpos).astype(np.float32) + frame["observation.qpos"] = obs["robot"]["qpos"][self._joint_ids].cpu() + frame["observation.qvel"] = obs["robot"]["qvel"][self._joint_ids].cpu() + frame["observation.qf"] = obs["robot"]["qf"][self._joint_ids].cpu() + + # Add extra observation features if they exist + for key in obs: + if key in ["robot", "sensor"]: + continue - frame["observation.state"] = state_data + frame[f"observation.{key}"] = obs[key].cpu() - # Add action (save complete qpos including gripper) + # Add action. + action = action[self._joint_ids] if isinstance(action, torch.Tensor): - action_data = action.cpu().numpy() - elif isinstance(action, np.ndarray): - action_data = action + action_data = action.cpu() elif isinstance(action, dict): # Extract qpos from action dict action_tensor = action.get( @@ -372,13 +448,7 @@ def _convert_frame_to_lerobot( break if isinstance(action_tensor, torch.Tensor): - action_data = action_tensor.cpu().numpy() - elif isinstance(action_tensor, np.ndarray): - action_data = action_tensor - else: - action_data = np.array(action_tensor) - else: - action_data = np.array(action) + action_data = action_tensor.cpu() frame["action"] = action_data diff --git a/embodichain/lab/gym/envs/managers/observations.py b/embodichain/lab/gym/envs/managers/observations.py index 1eb83fa4..537485af 100644 --- a/embodichain/lab/gym/envs/managers/observations.py +++ b/embodichain/lab/gym/envs/managers/observations.py @@ -38,6 +38,7 @@ def get_rigid_object_pose( env: EmbodiedEnv, obs: EnvObs, entity_cfg: SceneEntityCfg, + to_matrix: bool = True, ) -> torch.Tensor: """Get the world poses of the rigid objects in the environment. @@ -48,17 +49,52 @@ def get_rigid_object_pose( env: The environment instance. obs: The observation dictionary. entity_cfg: The configuration of the scene entity. + to_matrix: Whether to return the pose as a 4x4 transformation matrix. If False, returns as (position, quaternion). Returns: - A tensor of shape (num_envs, 4, 4) representing the world poses of the rigid objects. + A tensor of shape (num_envs, 7) or (num_envs, 4, 4) representing the world poses of the rigid objects. """ if entity_cfg.uid not in env.sim.get_rigid_object_uid_list(): - return torch.zeros((env.num_envs, 4, 4), dtype=torch.float32) + if to_matrix: + return torch.zeros( + (env.num_envs, 4, 4), dtype=torch.float32, device=env.device + ) + else: + return torch.zeros( + (env.num_envs, 7), dtype=torch.float32, device=env.device + ) + + obj = env.sim.get_rigid_object(entity_cfg.uid) + + return obj.get_local_pose(to_matrix=to_matrix) + + +def get_rigid_object_velocity( + env: EmbodiedEnv, + obs: EnvObs, + entity_cfg: SceneEntityCfg, +) -> torch.Tensor: + """Get the world velocities of the rigid objects in the environment. + + If the rigid object with the specified UID does not exist in the environment, + a zero tensor will be returned. + + Args: + env: The environment instance. + obs: The observation dictionary. + entity_cfg: The configuration of the scene entity. + + Returns: + A tensor of shape (num_envs, 6) representing the linear and angular velocities of the rigid objects. + """ + + if entity_cfg.uid not in env.sim.get_rigid_object_uid_list(): + return torch.zeros((env.num_envs, 6), dtype=torch.float32, device=env.device) obj = env.sim.get_rigid_object(entity_cfg.uid) - return obj.get_local_pose(to_matrix=True) + return obj.body_data.vel def normalize_robot_joint_data( diff --git a/embodichain/lab/gym/utils/gym_utils.py b/embodichain/lab/gym/utils/gym_utils.py index 637afcb9..d8b4427f 100644 --- a/embodichain/lab/gym/utils/gym_utils.py +++ b/embodichain/lab/gym/utils/gym_utils.py @@ -91,7 +91,9 @@ def convert_observation_to_space( torch.float32: np.float32, torch.float64: np.float64, torch.int8: np.int8, + torch.uint8: np.uint8, torch.int16: np.int16, + torch.uint16: np.uint16, torch.int32: np.int32, torch.int64: np.int64, torch.bool: np.bool_, diff --git a/tests/agents/test_rl.py b/tests/agents/test_rl.py index 09fdeeff..e0a5beaf 100644 --- a/tests/agents/test_rl.py +++ b/tests/agents/test_rl.py @@ -48,8 +48,6 @@ def setup_method(self): "robot_meta": { "robot_type": "UR10_DH_Gripper", "control_freq": 25, - "arm_dofs": 6, - "observation": {"vision": {}, "states": ["qpos"]}, }, "instruction": {"lang": "push_cube_to_target"}, "extra": { @@ -93,6 +91,11 @@ def teardown_method(self): os.remove(self.temp_gym_config_path) self.temp_gym_config_path = None + from embodichain.lab.sim import SimulationManager + + sim = SimulationManager.get_instance() + sim.destroy() + def test_training_pipeline(self): """Test RL training pipeline with multiple parallel environments.""" from embodichain.agents.rl.train import train_from_config