diff --git a/agent_context/topics/motion-planning/motion-planning.md b/agent_context/topics/motion-planning/motion-planning.md index d349043a..4398013b 100644 --- a/agent_context/topics/motion-planning/motion-planning.md +++ b/agent_context/topics/motion-planning/motion-planning.md @@ -7,6 +7,8 @@ | Planner registry | `embodichain/lab/sim/planners/__init__.py` | | Base planner class & config | `embodichain/lab/sim/planners/base_planner.py` → `BasePlanner`, `BasePlannerCfg`, `PlanOptions` | | TOPPRA planner | `embodichain/lab/sim/planners/toppra_planner.py` → `ToppraPlanner`, `ToppraPlannerCfg`, `ToppraPlanOptions` | +| Neural planner | `embodichain/lab/sim/planners/neural_planner.py` → `NeuralPlanner`, `NeuralPlannerCfg`, `NeuralPlanOptions` | +| Planner assets | `embodichain/data/assets/planner_assets.py` → `download_neural_planner_checkpoint()` | | Motion generator | `embodichain/lab/sim/planners/motion_generator.py` → `MotionGenerator`, `MotionGenCfg`, `MotionGenOptions` | | Planner utilities & data types | `embodichain/lab/sim/planners/utils.py` → `PlanState`, `PlanResult`, `MoveType`, `MovePart`, `TrajectorySampleMethod` | @@ -22,7 +24,8 @@ All planners resolve their robot at init via `SimulationManager.get_instance().g ``` BasePlanner (ABC) - └─ ToppraPlanner Time-optimal path parameterization + ├─ ToppraPlanner Time-optimal path parameterization + └─ NeuralPlanner (experimental) APG waypoint rollout MotionGenerator Wraps any BasePlanner; adds interpolation and multi-part support ``` @@ -30,12 +33,14 @@ MotionGenerator Wraps any BasePlanner; adds interpolation and multi-pa Config hierarchy: ``` BasePlannerCfg robot_uid (MISSING), planner_type - └─ ToppraPlannerCfg planner_type = "toppra" + ├─ ToppraPlannerCfg planner_type = "toppra" + └─ NeuralPlannerCfg planner_type = "neural", checkpoint_path (MISSING) MotionGenCfg planner_cfg (MISSING — must be a BasePlannerCfg subclass) PlanOptions (empty base) - └─ ToppraPlanOptions constraints, sample_method, sample_interval + ├─ ToppraPlanOptions constraints, sample_method, sample_interval + └─ NeuralPlanOptions control_part, start_qpos, max_steps MotionGenOptions start_qpos, control_part, plan_opts, is_interpolate, interpolate_nums, is_linear, interpolate_position_step, @@ -60,12 +65,21 @@ Time-optimal path parameterization using the [toppra](https://github.com/hungpha | `sample_method` | `TrajectorySampleMethod` | `QUANTITY` | `TIME`, `QUANTITY`, or `DISTANCE` | | `sample_interval` | `float \| int` | `0.01` | Time interval (seconds) or sample count depending on method | +### NeuralPlanner (experimental) + +Learning-based EEF waypoint planner. Franka Panda only. + +- Checkpoint: `download_neural_planner_checkpoint()` from HuggingFace (gated, needs `HF_TOKEN`) +- Use via `MotionGenerator` with `planner_type="neural"` and `plan_opts=NeuralPlanOptions(...)` +- Input: `EEF_MOVE` `PlanState` list with 4×4 `xpos` +- Key cfg: `checkpoint_path` (from download), `control_part` + ### MotionGenerator Unified interface for trajectory planning with optional pre-interpolation. - Wraps a `BasePlanner` instance (resolved from `planner_cfg.planner_type`). -- Supported planner types: `{"toppra": (ToppraPlanner, ToppraPlannerCfg)}`. +- Supported planner types: `{"toppra": (ToppraPlanner, ToppraPlannerCfg), "neural": (NeuralPlanner, NeuralPlannerCfg)}`. - `MotionGenCfg.planner_cfg` is **MISSING** — must be provided. `MotionGenOptions` fields: @@ -140,7 +154,7 @@ Describes one waypoint or action: ```python _support_planner_dict = { "toppra": (ToppraPlanner, ToppraPlannerCfg), - "my_planner": (MyPlanner, MyPlannerCfg), + "neural": (NeuralPlanner, NeuralPlannerCfg), } ``` 5. Export from `embodichain/lab/sim/planners/__init__.py`. diff --git a/docs/source/overview/sim/planners/index.rst b/docs/source/overview/sim/planners/index.rst index 5bdb9e8f..d1320be1 100644 --- a/docs/source/overview/sim/planners/index.rst +++ b/docs/source/overview/sim/planners/index.rst @@ -21,10 +21,14 @@ The `embodichain` project provides a unified interface for robot trajectory plan - **MotionGenerator**: A unified trajectory planning interface that supports joint/Cartesian interpolation, automatic constraint handling, flexible planner selection, and is easily extensible for collision checking and additional planners. - **ToppraPlanner**: A time-optimal trajectory planner based on the TOPPRA library, supporting joint trajectory generation under velocity and acceleration constraints. +- **NeuralPlanner** (experimental): A learning-based EEF waypoint planner for Franka Panda. - **TrajectorySampleMethod**: An enumeration for trajectory sampling strategies, supporting sampling by time, quantity, or distance. These tools can be used to generate smooth and dynamically feasible robot trajectories, and are extensible for future collision checking and various sampling requirements. +Use NeuralPlanner (experimental) when you have a trained APG checkpoint and need +learned EEF waypoint rollout on Franka Panda. + See also -------- @@ -32,5 +36,6 @@ See also :maxdepth: 1 toppra_planner.md + neural_planner.md trajectory_sample_method.md motion_generator.md diff --git a/docs/source/overview/sim/planners/motion_generator.md b/docs/source/overview/sim/planners/motion_generator.md index bee23cb6..4d8b53db 100644 --- a/docs/source/overview/sim/planners/motion_generator.md +++ b/docs/source/overview/sim/planners/motion_generator.md @@ -5,7 +5,7 @@ ## Features * **Unified planning interface**: Supports trajectory planning with or without collision checking (collision checking is reserved for future implementation). -* **Flexible planner selection**: Allows selection of different planners (currently supports TOPPRA for time-optimal planning). +* **Flexible planner selection**: Supports TOPPRA and NeuralPlanner (experimental). * **Automatic constraint handling**: Retrieves velocity and acceleration limits from the robot or uses user-specified/default values. * **Supports both joint and Cartesian interpolation**: Generates discrete trajectories using either joint space or Cartesian space interpolation. * **Convenient sampling**: Supports various sampling strategies via `TrajectorySampleMethod`. diff --git a/docs/source/overview/sim/planners/neural_planner.md b/docs/source/overview/sim/planners/neural_planner.md new file mode 100644 index 00000000..632e2c48 --- /dev/null +++ b/docs/source/overview/sim/planners/neural_planner.md @@ -0,0 +1,63 @@ +# NeuralPlanner + +````{admonition} Experimental +:class: warning + +`NeuralPlanner` is an **experimental** feature. The API, checkpoint format, +and default parameters may change without a deprecation cycle. It is currently +only validated on the **Franka Panda** robot. +```` + +`NeuralPlanner` is a learning-based EEF waypoint planner. It rolls out a +trained APG checkpoint through `MotionGenerator` to reach Cartesian targets. + +## Configuration + +Pre-trained checkpoints are hosted on HuggingFace and can be downloaded with +`download_neural_planner_checkpoint()` (requires `HF_TOKEN` environment variable). + +```python +from embodichain.data.assets.planner_assets import download_neural_planner_checkpoint +from embodichain.lab.sim.planners import ( + MotionGenCfg, + MotionGenOptions, + MotionGenerator, + MoveType, + NeuralPlannerCfg, + PlanState, +) +from embodichain.lab.sim.planners.neural_planner import NeuralPlanOptions + +checkpoint_path = download_neural_planner_checkpoint() + +motion_generator = MotionGenerator( + cfg=MotionGenCfg( + planner_cfg=NeuralPlannerCfg( + robot_uid=robot.uid, + checkpoint_path=checkpoint_path, + control_part="main_arm", + ) + ) +) + +result = motion_generator.generate( + target_states=[ + PlanState(move_type=MoveType.EEF_MOVE, xpos=waypoint) + for waypoint in waypoints + ], + options=MotionGenOptions( + plan_opts=NeuralPlanOptions( + control_part="main_arm", + start_qpos=start_qpos, + ), + ), +) +``` + +## Example + +```bash +python examples/sim/planners/neural_planner.py --headless --device cuda +``` + +The example downloads the checkpoint automatically on first run. diff --git a/embodichain/data/assets/__init__.py b/embodichain/data/assets/__init__.py index 6603f43d..58e384d4 100644 --- a/embodichain/data/assets/__init__.py +++ b/embodichain/data/assets/__init__.py @@ -21,3 +21,5 @@ from .eef_assets import * from .robot_assets import * from .scene_assets import * +from .solver_assets import * +from .planner_assets import * diff --git a/embodichain/data/assets/planner_assets.py b/embodichain/data/assets/planner_assets.py new file mode 100644 index 00000000..b362d809 --- /dev/null +++ b/embodichain/data/assets/planner_assets.py @@ -0,0 +1,91 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- +from __future__ import annotations + +import os + +from huggingface_hub import hf_hub_download + +# HuggingFace endpoint. Mirrors (e.g. hf-mirror.com) often redirect to the +# real hub without forwarding the required commit-hash response headers, so we +# default to the canonical endpoint and rely on the system proxy when needed. +_HF_ENDPOINT = "https://huggingface.co" + +__all__ = ["download_neural_planner_checkpoint"] + + +def download_neural_planner_checkpoint( + repo_id: str = "dexforce/neural_motion_generator", + filename: str = "franka/franka.pt", + token: str | None = None, + endpoint: str = _HF_ENDPOINT, +) -> str: + """Download a neural planner checkpoint from HuggingFace. + + The repository is gated. Either set the ``HF_TOKEN`` environment variable or + run ``huggingface-cli login`` before calling this function. + + If your network requires an HTTP proxy, set ``HTTPS_PROXY`` or + ``https_proxy`` in the environment before launching Python. + + Args: + repo_id: HuggingFace repository ID. + filename: Checkpoint path in the repo, e.g. ``franka/franka.pt``. + token: HuggingFace API token. Falls back to the ``HF_TOKEN`` + environment variable or the cached token from + ``huggingface-cli login``. + endpoint: HuggingFace-compatible endpoint URL. Defaults to + ``https://huggingface.co``. Mirrors that merely redirect to the + real hub are not supported. + + Returns: + str: Local path to the downloaded checkpoint file. + + Raises: + RuntimeError: If the download fails, with authentication instructions. + """ + # Normalize proxy env vars: the ``requests`` library on Linux requires the + # lowercase form (``https_proxy``), but users typically export the uppercase + # form (``HTTPS_PROXY``). + https_proxy = os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy") + if https_proxy: + os.environ.setdefault("https_proxy", https_proxy) + os.environ.setdefault("HTTPS_PROXY", https_proxy) + + # Allow callers to pass the token explicitly; otherwise fall back to + # HF_TOKEN (huggingface_hub also reads this automatically, but being + # explicit makes the fallback order transparent). + if token is None: + token = os.environ.get("HF_TOKEN") or None + + try: + return hf_hub_download( + repo_id=repo_id, + filename=filename, + token=token, + endpoint=endpoint, + ) + except Exception as exc: + raise RuntimeError( + f"Failed to download '{filename}' from '{repo_id}'.\n" + "The repository is gated and requires an authenticated HuggingFace account.\n" + "To fix this:\n" + " 1. Accept the model license at https://huggingface.co/dexforce/neural_motion_generator\n" + " 2. Create an access token at https://huggingface.co/settings/tokens\n" + " 3. Export the token: export HF_TOKEN=\n" + " or run: huggingface-cli login\n" + f"Original error: {exc}" + ) from exc diff --git a/embodichain/lab/sim/planners/__init__.py b/embodichain/lab/sim/planners/__init__.py index 52b217ac..bff37bf7 100644 --- a/embodichain/lab/sim/planners/__init__.py +++ b/embodichain/lab/sim/planners/__init__.py @@ -17,4 +17,5 @@ from .utils import * from .base_planner import * from .toppra_planner import * +from .neural_planner import * from .motion_generator import * diff --git a/embodichain/lab/sim/planners/motion_generator.py b/embodichain/lab/sim/planners/motion_generator.py index 220deeca..ba09efc2 100644 --- a/embodichain/lab/sim/planners/motion_generator.py +++ b/embodichain/lab/sim/planners/motion_generator.py @@ -27,6 +27,8 @@ BasePlanner, ToppraPlanner, ToppraPlannerCfg, + NeuralPlanner, + NeuralPlannerCfg, ) from embodichain.lab.sim.utility.action_utils import interpolate_with_nums from embodichain.utils import logger, configclass @@ -93,6 +95,7 @@ class MotionGenerator: _support_planner_dict = { "toppra": (ToppraPlanner, ToppraPlannerCfg), + "neural": (NeuralPlanner, NeuralPlannerCfg), } def __init__(self, cfg: MotionGenCfg) -> None: @@ -217,7 +220,13 @@ def generate( else: target_plan_states = target_states - options.plan_opts.control_part = options.control_part + if options.plan_opts is None: + if hasattr(self.planner, "default_plan_options"): + options.plan_opts = self.planner.default_plan_options() + else: + options.plan_opts = PlanOptions() + # Planner-specific options (e.g. NeuralPlanOptions) must be set on + # plan_opts explicitly, same as ToppraPlanOptions. result = self.planner.plan( target_states=target_plan_states, options=options.plan_opts ) diff --git a/embodichain/lab/sim/planners/neural_planner.py b/embodichain/lab/sim/planners/neural_planner.py new file mode 100644 index 00000000..62adb0e6 --- /dev/null +++ b/embodichain/lab/sim/planners/neural_planner.py @@ -0,0 +1,500 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- +from __future__ import annotations + +from dataclasses import MISSING +from pathlib import Path + +import numpy as np +import torch +import torch.nn as nn + +from embodichain.lab.sim.planners.base_planner import ( + BasePlanner, + BasePlannerCfg, + PlanOptions, + validate_plan_options, +) +from embodichain.lab.sim.planners.utils import MoveType, PlanResult, PlanState +from embodichain.utils import configclass, logger +from embodichain.utils.math import convert_quat, quat_error_magnitude, quat_from_matrix + +__all__ = [ + "NeuralPlanner", + "NeuralPlannerCfg", + "NeuralPlanOptions", +] + + +def _layer_init(layer: nn.Linear, std: float = np.sqrt(2), bias_const: float = 0.0): + torch.nn.init.orthogonal_(layer.weight, std) + torch.nn.init.constant_(layer.bias, bias_const) + return layer + + +class _RunningObsNormalizer: + def __init__(self, mean: torch.Tensor, var: torch.Tensor): + self.mean = mean + self.var = var + + def normalize(self, obs: torch.Tensor) -> torch.Tensor: + return (obs - self.mean) / (self.var.sqrt() + 1e-8) + + +class _WaypointTransformerActor(nn.Module): + """APG waypoint actor runtime copied in lightweight form for inference.""" + + def __init__( + self, + obs_dim: int, + action_dim: int, + num_waypoints: int, + use_relative_obs: bool = True, + hidden_dim: int = 256, + transformer_nhead: int = 4, + transformer_num_layers: int = 2, + transformer_ff_dim: int | None = None, + ): + super().__init__() + self.num_waypoints = int(num_waypoints) + self.use_relative_obs = bool(use_relative_obs) + if int(action_dim) != 7: + raise ValueError( + "Waypoint transformer checkpoints currently assume a 7-DoF arm. " + f"Got action_dim={action_dim}." + ) + self.state_dim = 7 + 7 + 7 + (7 if self.use_relative_obs else 0) + self.waypoint_token_dim = 3 + 4 + 1 + 1 + + expected_obs_dim = 7 + 7 + self.num_waypoints * 3 + expected_obs_dim += self.num_waypoints * 4 + self.num_waypoints * 2 + 7 + if self.use_relative_obs: + expected_obs_dim += 7 + if int(obs_dim) != expected_obs_dim: + raise ValueError( + "Waypoint transformer expected obs_dim " + f"{expected_obs_dim}, got {obs_dim}." + ) + + if hidden_dim % transformer_nhead != 0: + raise ValueError("hidden_dim must be divisible by transformer_nhead") + + ff_dim = transformer_ff_dim or hidden_dim * 4 + self.state_proj = _layer_init(nn.Linear(self.state_dim, hidden_dim)) + self.waypoint_proj = _layer_init(nn.Linear(self.waypoint_token_dim, hidden_dim)) + self.state_type_embedding = nn.Parameter(torch.zeros(1, 1, hidden_dim)) + self.waypoint_type_embedding = nn.Parameter(torch.zeros(1, 1, hidden_dim)) + self.waypoint_index_embedding = nn.Parameter( + torch.zeros(1, self.num_waypoints, hidden_dim) + ) + + encoder_layer = nn.TransformerEncoderLayer( + d_model=hidden_dim, + nhead=transformer_nhead, + dim_feedforward=ff_dim, + dropout=0.0, + activation="gelu", + batch_first=True, + norm_first=False, + ) + self.encoder = nn.TransformerEncoder( + encoder_layer, num_layers=transformer_num_layers + ) + self.actor_head = nn.Sequential( + nn.LayerNorm(hidden_dim), + _layer_init(nn.Linear(hidden_dim, hidden_dim)), + nn.Tanh(), + _layer_init(nn.Linear(hidden_dim, action_dim), std=0.01), + ) + + def _parse_obs(self, x: torch.Tensor): + n = self.num_waypoints + cursor = 0 + joint = x[:, cursor : cursor + 7] + cursor += 7 + eef_pose = x[:, cursor : cursor + 7] + cursor += 7 + waypoint_pos = x[:, cursor : cursor + 3 * n].reshape(-1, n, 3) + cursor += 3 * n + waypoint_quat = x[:, cursor : cursor + 4 * n].reshape(-1, n, 4) + cursor += 4 * n + active_onehot = x[:, cursor : cursor + n].reshape(-1, n, 1) + cursor += n + valid_mask = x[:, cursor : cursor + n].reshape(-1, n, 1) + cursor += n + last_action = x[:, cursor : cursor + 7] + cursor += 7 + + state_parts = [joint, eef_pose, last_action] + if self.use_relative_obs: + state_parts.append(x[:, cursor : cursor + 7]) + + state = torch.cat(state_parts, dim=-1) + waypoint_tokens = torch.cat( + [waypoint_pos, waypoint_quat, active_onehot, valid_mask], dim=-1 + ) + return state, waypoint_tokens, valid_mask.squeeze(-1) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + state, waypoint_tokens, valid_mask = self._parse_obs(x) + state_token = self.state_proj(state).unsqueeze(1) + self.state_type_embedding + waypoint_tokens = ( + self.waypoint_proj(waypoint_tokens) + + self.waypoint_type_embedding + + self.waypoint_index_embedding + ) + tokens = torch.cat([state_token, waypoint_tokens], dim=1) + state_padding = torch.zeros( + valid_mask.shape[0], 1, dtype=torch.bool, device=valid_mask.device + ) + waypoint_padding = valid_mask < 0.5 + padding_mask = torch.cat([state_padding, waypoint_padding], dim=1) + encoded = self.encoder(tokens, src_key_padding_mask=padding_mask) + return self.actor_head(encoded[:, 0]) + + +@configclass +class NeuralPlannerCfg(BasePlannerCfg): + planner_type: str = "neural" + + checkpoint_path: str = MISSING + """Path to an APG waypoint checkpoint (.pt), e.g. from ``download_neural_planner_checkpoint()``.""" + + control_part: str | None = None + """Robot control part used for FK and qpos, e.g. 'left_arm'.""" + + max_steps: int | None = None + """Maximum rollout steps. If None, uses checkpoint max_episode_steps.""" + + action_scale: float = 0.2 + """Delta joint scaling factor in radians.""" + + num_arm_joints: int = 7 + """Number of arm joints controlled by the APG policy.""" + + pos_eps: float | None = None + """Waypoint position threshold. If None, uses checkpoint waypoint_pos_threshold.""" + + rot_eps: float | None = None + """Waypoint rotation threshold. If None, uses checkpoint waypoint_rot_threshold.""" + + dt: float = 0.01 + """Nominal timestep reported in PlanResult.""" + + +@configclass +class NeuralPlanOptions(PlanOptions): + control_part: str | None = None + start_qpos: torch.Tensor | None = None + max_steps: int | None = None + + +class NeuralPlanner(BasePlanner): + def __init__(self, cfg: NeuralPlannerCfg): + super().__init__(cfg) + + if self.robot.num_instances > 1: + logger.log_error( + "NeuralPlanner currently supports one robot instance", + NotImplementedError, + ) + + self.cfg: NeuralPlannerCfg = cfg + if cfg.checkpoint_path is MISSING or not str(cfg.checkpoint_path): + logger.log_error("checkpoint_path is required", ValueError) + self._load_checkpoint(Path(cfg.checkpoint_path)) + + def default_plan_options(self) -> NeuralPlanOptions: + return NeuralPlanOptions() + + def _load_checkpoint(self, checkpoint_path: Path) -> None: + if not checkpoint_path.exists(): + logger.log_error( + f"Checkpoint not found: {checkpoint_path}", FileNotFoundError + ) + + ckpt = torch.load(checkpoint_path, map_location=self.device, weights_only=False) + if "agent" not in ckpt: + raise KeyError( + f"Checkpoint at '{checkpoint_path}' is missing 'agent'. " + f"Available keys: {list(ckpt.keys())}." + ) + if "obs_normalizer" not in ckpt: + raise KeyError( + f"Checkpoint at '{checkpoint_path}' is missing 'obs_normalizer'. " + f"Available keys: {list(ckpt.keys())}." + ) + for subkey in ("mean", "var"): + if subkey not in ckpt["obs_normalizer"]: + raise KeyError( + f"Checkpoint obs_normalizer is missing '{subkey}'. " + f"Available: {list(ckpt['obs_normalizer'].keys())}." + ) + + self._ckpt_args = ckpt.get("args", {}) + self._num_waypoints = int(self._ckpt_args.get("waypoint_max", 1)) + self._use_relative_obs = bool( + self._ckpt_args.get("waypoint_use_relative_obs", True) + ) + self._policy_arch = self._ckpt_args.get("policy_arch") + if self._policy_arch != "transformer": + raise ValueError( + "NeuralPlanner only supports transformer waypoint checkpoints. " + f"Got policy_arch={self._policy_arch!r}." + ) + self._hidden_dim = int(self._ckpt_args.get("hidden_dim", 256)) + self._action_dim = int(self.cfg.num_arm_joints) + self._obs_dim = int(ckpt["obs_normalizer"]["mean"].numel()) + self._max_steps = int( + self.cfg.max_steps or self._ckpt_args.get("max_episode_steps", 30) + ) + self._pos_eps = float( + self.cfg.pos_eps + if self.cfg.pos_eps is not None + else self._ckpt_args.get("waypoint_pos_threshold", 0.05) + ) + self._rot_eps = float( + self.cfg.rot_eps + if self.cfg.rot_eps is not None + else self._ckpt_args.get("waypoint_rot_threshold", 0.3) + ) + self._intermediate_orientation = bool( + self._ckpt_args.get("waypoint_intermediate_orientation", True) + ) + + self._normalizer = _RunningObsNormalizer( + ckpt["obs_normalizer"]["mean"].to(self.device), + ckpt["obs_normalizer"]["var"].to(self.device), + ) + self._actor = self._build_actor().to(self.device) + + state_dict = { + k.replace("actor_mean.", ""): v.to(self.device) + for k, v in ckpt["agent"].items() + if k.startswith("actor_mean.") + } + if not state_dict: + raise KeyError("Checkpoint agent has no actor_mean.* weights.") + self._actor.load_state_dict(state_dict) + self._actor.eval() + + def _build_actor(self) -> nn.Module: + return _WaypointTransformerActor( + obs_dim=self._obs_dim, + action_dim=self._action_dim, + num_waypoints=self._num_waypoints, + use_relative_obs=self._use_relative_obs, + hidden_dim=self._hidden_dim, + transformer_nhead=int(self._ckpt_args.get("transformer_nhead", 4)), + transformer_num_layers=int( + self._ckpt_args.get("transformer_num_layers", 2) + ), + transformer_ff_dim=( + int(self._ckpt_args.get("transformer_ff_dim", 0)) or None + ), + ) + + @validate_plan_options(options_cls=NeuralPlanOptions) + def plan( + self, + target_states: list[PlanState], + options: NeuralPlanOptions = NeuralPlanOptions(), + ) -> PlanResult: + if not target_states: + return PlanResult(success=False, positions=None) + + control_part = options.control_part or self.cfg.control_part + if control_part is None: + logger.log_error( + "control_part is required for NeuralPlanner", + ValueError, + ) + + waypoints_pos, waypoints_quat, valid_mask, episode_k = self._parse_waypoints( + target_states + ) + qpos = self._initial_qpos(control_part, options.start_qpos) + limits = self.robot.get_qpos_limits(name=control_part)[0].to(self.device) + lower = limits[: self._action_dim, 0] + upper = limits[: self._action_dim, 1] + + last_action = torch.zeros(1, self._action_dim, device=self.device) + active_idx = 0 + positions = [qpos.squeeze(0).clone()] + xpos_list = [self._fk_matrix(qpos, control_part).squeeze(0)] + max_steps = int(options.max_steps or self._max_steps) + + with torch.no_grad(): + for _ in range(max_steps): + ee_pose = self._fk_pose_xyzw(qpos, control_part) + obs = self._build_obs( + qpos[:, : self._action_dim], + ee_pose, + waypoints_pos, + waypoints_quat, + valid_mask, + active_idx, + last_action, + ) + action = self._actor(self._normalizer.normalize(obs)).clamp(-1.0, 1.0) + qpos[:, : self._action_dim] += action * float(self.cfg.action_scale) + qpos[:, : self._action_dim] = torch.clamp( + qpos[:, : self._action_dim], lower, upper + ) + last_action = action + + positions.append(qpos.squeeze(0).clone()) + xpos_list.append(self._fk_matrix(qpos, control_part).squeeze(0)) + + ee_pose = self._fk_pose_xyzw(qpos, control_part) + if self._is_active_reached( + ee_pose, waypoints_pos, waypoints_quat, active_idx, episode_k + ): + active_idx += 1 + if active_idx >= episode_k: + break + + positions_t = torch.stack(positions) + xpos_t = torch.stack(xpos_list) + success = active_idx >= episode_k + dt = torch.full( + (positions_t.shape[0],), + float(self.cfg.dt), + dtype=torch.float32, + device=self.device, + ) + return PlanResult( + success=success, + positions=positions_t, + xpos_list=xpos_t, + dt=dt, + duration=float(max(positions_t.shape[0] - 1, 0) * self.cfg.dt), + ) + + def _parse_waypoints( + self, target_states: list[PlanState] + ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, int]: + if len(target_states) > self._num_waypoints: + logger.log_error( + f"Received {len(target_states)} waypoints, but checkpoint supports " + f"at most {self._num_waypoints}.", + ValueError, + ) + + waypoint_pos = torch.zeros(1, self._num_waypoints, 3, device=self.device) + waypoint_quat = torch.zeros(1, self._num_waypoints, 4, device=self.device) + valid_mask = torch.zeros(1, self._num_waypoints, device=self.device) + + for idx, target in enumerate(target_states): + if target.move_type != MoveType.EEF_MOVE or target.xpos is None: + logger.log_error( + "NeuralPlanner expects EEF_MOVE PlanState entries with xpos.", + ValueError, + ) + xpos = torch.as_tensor(target.xpos, dtype=torch.float32, device=self.device) + waypoint_pos[0, idx] = xpos[:3, 3] + waypoint_quat[0, idx] = convert_quat( + quat_from_matrix(xpos[:3, :3].unsqueeze(0)), to="xyzw" + )[0] + valid_mask[0, idx] = 1.0 + + return waypoint_pos, waypoint_quat, valid_mask, len(target_states) + + def _initial_qpos( + self, control_part: str, start_qpos: torch.Tensor | None + ) -> torch.Tensor: + if start_qpos is None: + qpos = self.robot.get_qpos(name=control_part)[0] + else: + qpos = torch.as_tensor(start_qpos, dtype=torch.float32, device=self.device) + if qpos.dim() == 1: + qpos = qpos.unsqueeze(0) + if qpos.shape[-1] < self._action_dim: + logger.log_error( + f"start_qpos has {qpos.shape[-1]} joints, but policy expects " + f"{self._action_dim}.", + ValueError, + ) + return qpos.to(self.device).clone() + + def _fk_matrix(self, qpos: torch.Tensor, control_part: str) -> torch.Tensor: + return self.robot.compute_fk(qpos=qpos, name=control_part, to_matrix=True) + + def _fk_pose_xyzw(self, qpos: torch.Tensor, control_part: str) -> torch.Tensor: + fk = self.robot.compute_fk(qpos=qpos, name=control_part, to_matrix=False) + pos = fk[:, :3] + quat_xyzw = convert_quat(fk[:, 3:7], to="xyzw") + return torch.cat([pos, quat_xyzw], dim=-1) + + def _build_obs( + self, + joint_pos: torch.Tensor, + ee_pose: torch.Tensor, + waypoint_pos: torch.Tensor, + waypoint_quat: torch.Tensor, + valid_mask: torch.Tensor, + active_idx: int, + last_action: torch.Tensor, + ) -> torch.Tensor: + active_idx_clamped = min(active_idx, self._num_waypoints - 1) + active_onehot = torch.zeros(1, self._num_waypoints, device=self.device) + active_onehot[0, active_idx_clamped] = 1.0 + obs_parts = [ + joint_pos, + ee_pose, + waypoint_pos.reshape(1, self._num_waypoints * 3), + waypoint_quat.reshape(1, self._num_waypoints * 4), + active_onehot, + valid_mask, + last_action, + ] + if self._use_relative_obs: + active_pos = waypoint_pos[:, active_idx_clamped] + active_quat = waypoint_quat[:, active_idx_clamped] + obs_parts.append( + torch.cat([active_pos - ee_pose[:, :3], active_quat], dim=-1) + ) + obs = torch.cat(obs_parts, dim=-1) + if obs.shape[-1] != self._obs_dim: + raise ValueError( + f"Built obs dim {obs.shape[-1]}, expected {self._obs_dim}." + ) + return obs + + def _is_active_reached( + self, + ee_pose: torch.Tensor, + waypoint_pos: torch.Tensor, + waypoint_quat: torch.Tensor, + active_idx: int, + episode_k: int, + ) -> bool: + active_pos = waypoint_pos[:, active_idx] + active_quat_xyzw = waypoint_quat[:, active_idx] + pos_dist = (ee_pose[:, :3] - active_pos).norm(dim=-1) + ee_quat_wxyz = convert_quat(ee_pose[:, 3:7], to="wxyz") + active_quat_wxyz = convert_quat(active_quat_xyzw, to="wxyz") + rot_dist = quat_error_magnitude(ee_quat_wxyz, active_quat_wxyz) + orientation_required = ( + self._intermediate_orientation or active_idx >= episode_k - 1 + ) + rot_ok = ( + (rot_dist < self._rot_eps) + if orientation_required + else torch.ones_like(rot_dist, dtype=torch.bool) + ) + reached = (pos_dist < self._pos_eps) & rot_ok + return bool(reached.item()) diff --git a/examples/sim/planners/neural_planner.py b/examples/sim/planners/neural_planner.py new file mode 100644 index 00000000..2cf4b0bc --- /dev/null +++ b/examples/sim/planners/neural_planner.py @@ -0,0 +1,269 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- + +from __future__ import annotations + +import argparse +import math +import os +import time + +import numpy as np +import torch + +from embodichain.data import get_data_path +from embodichain.data.assets.planner_assets import download_neural_planner_checkpoint +from embodichain.lab.sim import SimulationManager, SimulationManagerCfg +from embodichain.lab.sim.cfg import MarkerCfg, RobotCfg +from embodichain.lab.sim.objects import Robot +from embodichain.lab.sim.planners import ( + MotionGenCfg, + MotionGenOptions, + MotionGenerator, + MoveType, + NeuralPlannerCfg, + PlanState, +) +from embodichain.lab.sim.planners.neural_planner import NeuralPlanOptions + + +def parse_args() -> argparse.Namespace: + default_device = "cuda" if torch.cuda.is_available() else "cpu" + parser = argparse.ArgumentParser(description="NeuralPlanner waypoint example") + parser.add_argument( + "--device", + type=str, + default=default_device, + choices=["cpu", "cuda"], + help="Simulation and planner device.", + ) + parser.add_argument( + "--num-waypoints", + type=int, + default=5, + help="Number of EEF waypoints to send to the neural planner.", + ) + parser.add_argument( + "--headless", + action="store_true", + help="Run without opening the viewer window.", + ) + parser.add_argument( + "--step-repeat", + type=int, + default=10, + help="Simulation updates per planned waypoint during playback.", + ) + parser.add_argument( + "--hold-steps", + type=int, + default=60, + help="Simulation updates to hold before and after playback.", + ) + parser.add_argument( + "--interactive", + action="store_true", + help="Drop into IPython after playback.", + ) + return parser.parse_args() + + +def _resolve_device(device: str) -> str: + if device == "cuda" and not torch.cuda.is_available(): + raise RuntimeError( + "CUDA was requested but is not available. Use --device cpu instead." + ) + return device + + +def _franka_tcp() -> list[list[float]]: + c = math.cos(-math.pi / 4) + s = math.sin(-math.pi / 4) + return [ + [c, -s, 0.0, 0.0], + [s, c, 0.0, 0.0], + [0.0, 0.0, 1.0, 0.1034], + [0.0, 0.0, 0.0, 1.0], + ] + + +def create_franka(sim: SimulationManager) -> Robot: + urdf = get_data_path("Franka/Panda/PandaWithHand.urdf") + assert os.path.isfile(urdf) + + cfg_dict = { + "fpath": urdf, + "control_parts": { + "main_arm": [ + "Joint1", + "Joint2", + "Joint3", + "Joint4", + "Joint5", + "Joint6", + "Joint7", + ], + }, + "solver_cfg": { + "main_arm": { + "class_type": "PytorchSolver", + "end_link_name": "ee_link", + "root_link_name": "base_link", + "tcp": _franka_tcp(), + }, + }, + } + return sim.add_robot(cfg=RobotCfg.from_dict(cfg_dict)) + + +def make_waypoints(start_pose: torch.Tensor, num_waypoints: int) -> torch.Tensor: + """Create a compact pose path around the start pose.""" + offsets = torch.tensor( + [ + [0.10, 0.00, 0.00], + [0.10, 0.10, 0.00], + [0.00, 0.10, -0.08], + [-0.10, 0.10, -0.08], + [-0.10, 0.00, 0.00], + [0.00, -0.10, 0.00], + [0.10, -0.10, -0.06], + [0.00, 0.00, -0.12], + ], + dtype=start_pose.dtype, + device=start_pose.device, + ) + num_waypoints = max(1, min(int(num_waypoints), offsets.shape[0])) + waypoints = start_pose.unsqueeze(0).repeat(num_waypoints, 1, 1) + waypoints[:, :3, 3] += offsets[:num_waypoints] + return waypoints + + +def draw_waypoint_markers( + sim: SimulationManager, + waypoints: torch.Tensor, + arena_offset: torch.Tensor, +) -> None: + marker_poses = waypoints.detach().cpu().numpy().copy() + marker_poses[:, :3, 3] += arena_offset.detach().cpu().numpy().reshape(1, 3) + sim.draw_marker( + cfg=MarkerCfg( + name="neural_planner_waypoints", + marker_type="axis", + axis_xpos=list(marker_poses), + axis_size=0.003, + axis_len=0.03, + arena_index=-1, + ) + ) + + +def play_trajectory( + sim: SimulationManager, + robot: Robot, + arm_name: str, + positions: torch.Tensor, + step_repeat: int = 4, + delay: float = 0.0, +) -> None: + joint_ids = robot.get_joint_ids(arm_name) + for qpos in positions: + robot.set_qpos(qpos=qpos.unsqueeze(0), joint_ids=joint_ids) + sim.update(step=step_repeat) + if delay > 0.0: + time.sleep(delay) + + +def main() -> None: + args = parse_args() + checkpoint_path = download_neural_planner_checkpoint() + + sim_device = _resolve_device(args.device) + sim = SimulationManager( + SimulationManagerCfg( + headless=args.headless, + sim_device=sim_device, + num_envs=1, + arena_space=2.0, + ) + ) + + robot = create_franka(sim) + arm_name = "main_arm" + device = robot.device + + if not args.headless: + sim.open_window() + + start_qpos = torch.tensor( + [0.0, -np.pi / 4, 0.0, -3 * np.pi / 4, 0.0, np.pi / 2, np.pi / 4], + dtype=torch.float32, + device=device, + ) + robot.set_qpos( + qpos=start_qpos.unsqueeze(0), joint_ids=robot.get_joint_ids(arm_name) + ) + sim.update(step=args.hold_steps) + + start_pose = robot.compute_fk( + qpos=start_qpos.unsqueeze(0), name=arm_name, to_matrix=True + )[0] + waypoints = make_waypoints(start_pose, args.num_waypoints) + draw_waypoint_markers(sim, waypoints, sim.arena_offsets[0]) + + motion_generator = MotionGenerator( + cfg=MotionGenCfg( + planner_cfg=NeuralPlannerCfg( + robot_uid=robot.uid, + checkpoint_path=checkpoint_path, + control_part=arm_name, + ) + ) + ) + target_states = [ + PlanState(move_type=MoveType.EEF_MOVE, xpos=waypoint) for waypoint in waypoints + ] + result = motion_generator.generate( + target_states=target_states, + options=MotionGenOptions( + plan_opts=NeuralPlanOptions( + control_part=arm_name, + start_qpos=start_qpos, + ), + ), + ) + + print(f"NeuralPlanner success: {result.success}") + print(f"positions shape: {tuple(result.positions.shape)}") + print(f"xpos_list shape: {tuple(result.xpos_list.shape)}") + print(f"duration: {result.duration:.3f}s") + + play_trajectory( + sim, + robot, + arm_name, + result.positions, + step_repeat=max(args.step_repeat, 1), + ) + sim.update(step=args.hold_steps) + + if args.interactive: + from IPython import embed + + embed(header="NeuralPlanner example. Press Ctrl+D to exit.") + + +if __name__ == "__main__": + main() diff --git a/tests/sim/planners/test_neural_planner.py b/tests/sim/planners/test_neural_planner.py new file mode 100644 index 00000000..878c54ff --- /dev/null +++ b/tests/sim/planners/test_neural_planner.py @@ -0,0 +1,211 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2021-2026 DexForce Technology Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ---------------------------------------------------------------------------- +from __future__ import annotations + +import torch +import pytest + +from embodichain.lab.sim.planners import ( + MotionGenCfg, + MotionGenOptions, + MotionGenerator, + MoveType, + NeuralPlanner, + NeuralPlannerCfg, + PlanState, +) +from embodichain.lab.sim.planners.neural_planner import ( + NeuralPlanOptions, + _WaypointTransformerActor, +) +from embodichain.lab.sim.sim_manager import SimulationManager + +NUM_ARM_JOINTS = 7 +NUM_WAYPOINTS = 3 +OBS_DIM = 28 + 9 * NUM_WAYPOINTS +HIDDEN_DIM = 32 + + +def _create_fake_checkpoint(tmp_path) -> str: + actor = _WaypointTransformerActor( + obs_dim=OBS_DIM, + action_dim=NUM_ARM_JOINTS, + num_waypoints=NUM_WAYPOINTS, + use_relative_obs=True, + hidden_dim=HIDDEN_DIM, + transformer_nhead=4, + transformer_num_layers=1, + ) + checkpoint = { + "agent": {f"actor_mean.{k}": v for k, v in actor.state_dict().items()}, + "obs_normalizer": { + "mean": torch.zeros(OBS_DIM), + "var": torch.ones(OBS_DIM), + "count": 1.0, + }, + "args": { + "policy_arch": "transformer", + "hidden_dim": HIDDEN_DIM, + "transformer_nhead": 4, + "transformer_num_layers": 1, + "transformer_ff_dim": 0, + "waypoint_max": NUM_WAYPOINTS, + "waypoint_use_relative_obs": True, + "waypoint_intermediate_orientation": True, + "max_episode_steps": 3, + "waypoint_pos_threshold": 0.05, + "waypoint_rot_threshold": 0.3, + }, + } + checkpoint_path = tmp_path / "fake_neural_planner.pt" + torch.save(checkpoint, checkpoint_path) + return str(checkpoint_path) + + +class FakeRobot: + uid = "fake_robot" + device = torch.device("cpu") + num_instances = 1 + + def get_qpos(self, name: str | None = None, target: bool = False) -> torch.Tensor: + return torch.zeros(1, NUM_ARM_JOINTS) + + def get_qpos_limits(self, name: str | None = None) -> torch.Tensor: + limits = torch.zeros(1, NUM_ARM_JOINTS, 2) + limits[..., 0] = -2.0 + limits[..., 1] = 2.0 + return limits + + def compute_fk( + self, + qpos: torch.Tensor, + name: str | None = None, + to_matrix: bool = False, + **kwargs, + ) -> torch.Tensor: + batch = qpos.shape[0] if qpos.dim() > 1 else 1 + if to_matrix: + return torch.eye(4).repeat(batch, 1, 1) + return torch.tensor([[0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0]]).repeat(batch, 1) + + +class FakeSimulationManager: + def __init__(self): + self.robot = FakeRobot() + + def get_robot(self, uid: str) -> FakeRobot: + return self.robot + + +def test_neural_planner_is_registered(): + assert MotionGenerator._support_planner_dict["neural"][0] is NeuralPlanner + assert MotionGenerator._support_planner_dict["neural"][1] is NeuralPlannerCfg + + +def test_neural_planner_generate_with_fake_checkpoint(tmp_path, monkeypatch): + checkpoint_path = _create_fake_checkpoint(tmp_path) + fake_sim = FakeSimulationManager() + monkeypatch.setattr( + SimulationManager, "get_instance", classmethod(lambda cls: fake_sim) + ) + + motion_generator = MotionGenerator( + cfg=MotionGenCfg( + planner_cfg=NeuralPlannerCfg( + robot_uid="fake_robot", + checkpoint_path=checkpoint_path, + control_part="main_arm", + ) + ) + ) + + target_state = PlanState(move_type=MoveType.EEF_MOVE, xpos=torch.eye(4)) + result = motion_generator.generate( + target_states=[target_state], + options=MotionGenOptions( + plan_opts=NeuralPlanOptions( + control_part="main_arm", + start_qpos=torch.zeros(NUM_ARM_JOINTS), + ), + ), + ) + + assert result.success is True + assert result.positions is not None + assert result.positions.shape[-1] == NUM_ARM_JOINTS + assert torch.isfinite(result.positions).all() + assert result.xpos_list is not None + assert result.xpos_list.shape[-2:] == (4, 4) + + +def test_neural_planner_uses_plan_opts_start_qpos(tmp_path, monkeypatch): + checkpoint_path = _create_fake_checkpoint(tmp_path) + fake_sim = FakeSimulationManager() + monkeypatch.setattr( + SimulationManager, "get_instance", classmethod(lambda cls: fake_sim) + ) + + motion_generator = MotionGenerator( + cfg=MotionGenCfg( + planner_cfg=NeuralPlannerCfg( + robot_uid="fake_robot", + checkpoint_path=checkpoint_path, + control_part="main_arm", + ) + ) + ) + custom_qpos = torch.ones(NUM_ARM_JOINTS) + result = motion_generator.generate( + target_states=[PlanState(move_type=MoveType.EEF_MOVE, xpos=torch.eye(4))], + options=MotionGenOptions( + plan_opts=NeuralPlanOptions( + control_part="main_arm", + start_qpos=custom_qpos, + ), + ), + ) + + assert result.success is True + assert torch.allclose(result.positions[0], custom_qpos) + + +def test_neural_planner_rejects_short_start_qpos(tmp_path, monkeypatch): + checkpoint_path = _create_fake_checkpoint(tmp_path) + fake_sim = FakeSimulationManager() + monkeypatch.setattr( + SimulationManager, "get_instance", classmethod(lambda cls: fake_sim) + ) + + motion_generator = MotionGenerator( + cfg=MotionGenCfg( + planner_cfg=NeuralPlannerCfg( + robot_uid="fake_robot", + checkpoint_path=checkpoint_path, + control_part="main_arm", + ) + ) + ) + + with pytest.raises(ValueError, match="policy expects"): + motion_generator.generate( + target_states=[PlanState(move_type=MoveType.EEF_MOVE, xpos=torch.eye(4))], + options=MotionGenOptions( + plan_opts=NeuralPlanOptions( + control_part="main_arm", + start_qpos=torch.zeros(NUM_ARM_JOINTS - 1), + ), + ), + )