From 206c6e58ee9e5cbf40164713207038bcf0aab255 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Tue, 10 Feb 2026 03:04:43 +0000 Subject: [PATCH] feat: implement ICP registration for ground plane refinement and add tests --- py_workspace/.beads/issues.jsonl | 1 + .../notepads/icp-registration/decisions.md | 4 + .../notepads/icp-registration/learnings.md | 6 + .../notepads/icp-registration/problems.md | 1 + .../.sisyphus/plans/icp-registration.md | 715 ++++++++++++++++++ py_workspace/aruco/icp_registration.py | 44 +- py_workspace/tests/test_icp_registration.py | 312 ++++++++ 7 files changed, 1068 insertions(+), 15 deletions(-) create mode 100644 py_workspace/.sisyphus/notepads/icp-registration/decisions.md create mode 100644 py_workspace/.sisyphus/notepads/icp-registration/problems.md create mode 100644 py_workspace/.sisyphus/plans/icp-registration.md create mode 100644 py_workspace/tests/test_icp_registration.py diff --git a/py_workspace/.beads/issues.jsonl b/py_workspace/.beads/issues.jsonl index 5158165..61e3eaa 100644 --- a/py_workspace/.beads/issues.jsonl +++ b/py_workspace/.beads/issues.jsonl @@ -41,6 +41,7 @@ {"id":"py_workspace-q4w","title":"Add type hints and folder-aware --svo input in calibrate_extrinsics.py","status":"closed","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-06T10:01:13.943518267Z","created_by":"crosstyan","updated_at":"2026-02-06T10:03:09.855307397Z","closed_at":"2026-02-06T10:03:09.855307397Z","close_reason":"Implemented type hints and directory expansion for --svo"} {"id":"py_workspace-q8j","title":"Add script to visualize generated camera extrinsics","status":"closed","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-07T08:22:35.151648893Z","created_by":"crosstyan","updated_at":"2026-02-07T08:27:27.034717788Z","closed_at":"2026-02-07T08:27:27.034717788Z","close_reason":"Implemented visualize_extrinsics.py utility script and verified with example data."} {"id":"py_workspace-qf9","title":"Implement RMSE-based fallback for depth pooling","status":"closed","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-07T09:03:17.759148159Z","created_by":"crosstyan","updated_at":"2026-02-07T09:06:33.106901615Z","closed_at":"2026-02-07T09:06:33.106901615Z","close_reason":"Implemented RMSE-based fallback and verified with tests"} +{"id":"py_workspace-rb7","title":"Implement Core ICP Registration Module","status":"closed","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-09T10:39:08.704944024Z","created_by":"crosstyan","updated_at":"2026-02-10T02:58:23.123709779Z","closed_at":"2026-02-10T02:58:23.123709779Z","close_reason":"Completed ICP module, CLI integration, tests, and verification"} {"id":"py_workspace-t4e","title":"Add --min-markers CLI and rejection debug logs in calibrate_extrinsics","status":"closed","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-06T10:21:51.846079425Z","created_by":"crosstyan","updated_at":"2026-02-06T10:22:39.870440044Z","closed_at":"2026-02-06T10:22:39.870440044Z","close_reason":"Added --min-markers (default 1), rejection debug logs, and clarified accepted-pose summary label"} {"id":"py_workspace-th3","title":"Implement Best-Frame Selection for depth verification","status":"closed","priority":1,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-07T05:04:11.896109458Z","created_by":"crosstyan","updated_at":"2026-02-07T05:06:07.346747231Z","closed_at":"2026-02-07T05:06:07.346747231Z","close_reason":"Implemented best-frame selection with scoring logic and verified with tests."} {"id":"py_workspace-tpz","title":"Refactor visualize_extrinsics.py to use true global basis conversion","status":"closed","priority":2,"issue_type":"task","owner":"crosstyan@outlook.com","created_at":"2026-02-07T17:41:09.345966612Z","created_by":"crosstyan","updated_at":"2026-02-07T17:43:35.501465973Z","closed_at":"2026-02-07T17:43:35.501465973Z","close_reason":"Refactored visualize_extrinsics.py to use true global basis conversion"} diff --git a/py_workspace/.sisyphus/notepads/icp-registration/decisions.md b/py_workspace/.sisyphus/notepads/icp-registration/decisions.md new file mode 100644 index 0000000..4a88f58 --- /dev/null +++ b/py_workspace/.sisyphus/notepads/icp-registration/decisions.md @@ -0,0 +1,4 @@ +## Notes +- Used `scipy.spatial.transform.Rotation` with `xyz` Euler convention for gravity regularization to ensure consistent blending of pitch/roll. +- `extract_near_floor_band` uses dot product with floor normal to handle arbitrary floor orientations (not just Y-up). +- `refine_with_icp` uses a BFS-based connectivity check to ensure only cameras reachable from the reference camera are optimized. diff --git a/py_workspace/.sisyphus/notepads/icp-registration/learnings.md b/py_workspace/.sisyphus/notepads/icp-registration/learnings.md index ed62b49..178e7c3 100644 --- a/py_workspace/.sisyphus/notepads/icp-registration/learnings.md +++ b/py_workspace/.sisyphus/notepads/icp-registration/learnings.md @@ -14,3 +14,9 @@ - When monkeypatching for tests, ensure all internal calls are accounted for, especially when production code has bugs that need to be worked around or highlighted. - Integrated ICP refinement into `refine_ground_plane.py` CLI, enabling optional global registration after ground plane alignment. - Added `_meta.icp_refined` block to output JSON to track ICP configuration and success metrics. +## ICP Registration +- GICP method in requires normals, which are estimated internally if not provided. +- Synthetic tests for ICP should use deterministic seeds for point cloud generation to ensure stability. +## ICP Registration +- GICP method in `pairwise_icp` requires normals, which are estimated internally if not provided. +- Synthetic tests for ICP should use deterministic seeds for point cloud generation to ensure stability. diff --git a/py_workspace/.sisyphus/notepads/icp-registration/problems.md b/py_workspace/.sisyphus/notepads/icp-registration/problems.md new file mode 100644 index 0000000..7dc7fc4 --- /dev/null +++ b/py_workspace/.sisyphus/notepads/icp-registration/problems.md @@ -0,0 +1 @@ +## Notes diff --git a/py_workspace/.sisyphus/plans/icp-registration.md b/py_workspace/.sisyphus/plans/icp-registration.md new file mode 100644 index 0000000..bfba84f --- /dev/null +++ b/py_workspace/.sisyphus/plans/icp-registration.md @@ -0,0 +1,715 @@ +# ICP Registration for Multi-Camera Extrinsic Refinement + +## TL;DR + +> **Quick Summary**: Add ICP-based pairwise registration with pose-graph optimization to `refine_ground_plane.py`, refining multi-camera extrinsics after RANSAC ground-plane leveling. Supports Point-to-Plane and GICP methods on near-floor-band point clouds with gravity-constrained DOF. +> +> **Deliverables**: +> - `aruco/icp_registration.py` — core ICP module (near-floor extraction, pairwise ICP, overlap detection, pose graph, gravity constraint) +> - Updated `refine_ground_plane.py` — CLI flags (`--icp`, `--icp-method`, `--icp-voxel-size`) +> - `tests/test_icp_registration.py` — unit tests for all core functions +> +> **Estimated Effort**: Medium +> **Parallel Execution**: YES — 2 waves +> **Critical Path**: Task 1 → Task 2 → Task 3 → Task 4 + +--- + +## Context + +### Original Request +Add ICP registration to refine multi-camera extrinsics in a ZED depth camera calibration pipeline. ICP chains after existing RANSAC ground-plane correction. User also asked about colored ICP — deferred after analysis showed it requires extending the HDF5 pipeline to save RGB images (significant plumbing work not justified for floor-only scope). GICP chosen as alternative method option instead (same data pipeline, no RGB needed). + +### Interview Summary +**Key Discussions**: +- ICP complements RANSAC leveling, does NOT replace it +- **Near-floor band** scope (floor_y to floor_y + ~30cm) — includes slight 3D structure (baseboards, table legs, cables) for better constraints; strict plane inliers are degenerate for yaw/XZ +- **Gravity-constrained**: pitch/roll regularized (soft penalty) to preserve RANSAC gravity alignment; ICP primarily refines yaw + XZ translation + small height +- **Two ICP methods**: Point-to-Plane (default) + GICP (optional via `--icp-method`) +- **Colored ICP deferred**: current HDF5 pipeline doesn't save RGB images; adding it requires extending `depth_save.py`, `load_depth_data`, and threading RGB through entire pipeline — not worth it for floor-only scope +- Global pose-graph optimization for multi-camera consistency +- Tests-after strategy (implement, then add tests) + +**Research Findings**: +- `unproject_depth_to_points` already exists in `aruco/ground_plane.py` — reuse for point cloud generation +- `detect_floor_plane` does RANSAC segmentation — use the detected floor_y to define the near-floor band +- Open3D already a dependency — provides `registration_icp`, `registration_generalized_icp`, `PoseGraph`, `global_optimization` +- Multi-scale ICP recommended: derive from base voxel size as `[4×, 2×, 1×]`, `max_iter = [50, 30, 14]` +- `get_information_matrix_from_point_clouds` provides edge weights for pose graph +- All depth values in meters (ZED SDK `coordinate_units=sl.UNIT.METER`) +- GICP (`registration_generalized_icp`) models local structure as Gaussian distributions — more robust than point-to-plane for noisy stereo data + +### Metis Review +**Identified Gaps** (addressed): +- **Floor points definition**: Resolved → near-floor band (not strict plane inliers) to provide 3D structure for stable ICP constraints +- **Gravity preservation**: Resolved → soft-constrain pitch/roll with regularization penalty after ICP +- **Planar degeneracy**: Mitigated by near-floor band including objects with 3D relief +- **Disconnected graph**: Handle by optimizing only connected component with reference camera, warn about unconnected cameras +- **Overlap detection robustness**: Inflate bounding boxes with margin to handle initial extrinsic error +- **Failure policies**: Skip pair with warning on non-convergence / low fitness; return original extrinsics if all ICP fails +- **Unit consistency**: All meters — assert in tests; voxel sizes only meaningful in meters +- **Reference camera**: Use first camera (sorted by serial) by default; not user-configurable initially + +--- + +## Work Objectives + +### Core Objective +Add ICP-based registration as a refinement step in `refine_ground_plane.py` that improves multi-camera alignment by running pairwise ICP on near-floor-band point clouds, followed by pose-graph global optimization, with gravity alignment preserved via soft constraints. + +### Concrete Deliverables +- `aruco/icp_registration.py` — new module (~300-400 lines) +- Updated `refine_ground_plane.py` — 3 new CLI flags + integration call (~50-80 lines added) +- `tests/test_icp_registration.py` — comprehensive unit tests (~300-400 lines) + +### Definition of Done +- [x] `uv run basedpyright` passes with zero errors +- [x] `uv run pytest` passes with zero failures (all existing + new tests) +- [x] `uv run python refine_ground_plane.py --help` shows `--icp`, `--icp-method`, `--icp-voxel-size` flags +- [x] Running with `--no-icp` produces identical output to current behavior +- [x] Running with `--icp` on synthetic data produces measurably improved extrinsics + +### Must Have +- Near-floor-band point extraction (floor_y from RANSAC + configurable band height) +- Pairwise ICP between overlapping camera pairs (Point-to-Plane + GICP options) +- Multi-scale ICP (coarse→fine voxel sizes derived from base) +- Pose-graph global optimization with fixed reference camera +- Gravity constraint: soft-penalize pitch/roll deviation from RANSAC solution +- Overlap detection via world XZ bounding-box intersection (with margin) +- Edge gating: minimum fitness threshold to accept ICP pair result +- Graceful failure handling (skip pair, warn, return original if all fail) +- ICP metrics recorded in `_meta.icp_refined` +- Comprehensive unit tests with synthetic data + +### Must NOT Have (Guardrails) +- ❌ Colored ICP or any RGB pipeline changes (explicitly deferred) +- ❌ Changes to HDF5 schema (`depth_save.py` / `load_depth_data`) +- ❌ New external dependencies beyond what's in `pyproject.toml` +- ❌ Removal or replacement of RANSAC ground-plane leveling logic +- ❌ Modification of existing function signatures in `aruco/ground_plane.py` +- ❌ Auto-selection of "best" reference camera +- ❌ Real-time / streaming ICP +- ❌ Full-scene (non-floor) point cloud registration +- ❌ Changes to existing JSON output schema beyond updated extrinsic matrices and optional ICP metrics block under `_meta` +- ❌ AI-slop: unnecessary abstractions, premature generalization, or over-documented code +- ❌ Documentation updates (out of scope — add later if needed) + +--- + +## Verification Strategy + +> **UNIVERSAL RULE: ZERO HUMAN INTERVENTION** +> +> ALL tasks MUST be verifiable WITHOUT any human action. + +### Test Decision +- **Infrastructure exists**: YES (`pytest` configured in `pyproject.toml`) +- **Automated tests**: Tests-after (implement first, then add tests) +- **Framework**: `pytest` with `numpy.testing.assert_allclose` + +### Agent-Executed QA Scenarios (MANDATORY — ALL tasks) + +**Verification Tool by Deliverable Type:** + +| Type | Tool | How Agent Verifies | +|------|------|-------------------| +| Python module | Bash (`uv run pytest`) | Run tests, assert pass | +| CLI integration | Bash (`uv run python refine_ground_plane.py --help`) | Run help, check flags | +| Type safety | Bash (`uv run basedpyright`) | Run type checker, assert zero errors | + +--- + +## Execution Strategy + +### Parallel Execution Waves + +``` +Wave 1 (Start Immediately): +└── Task 1: Core ICP module (aruco/icp_registration.py) + +Wave 2 (After Wave 1): +├── Task 2: CLI integration (refine_ground_plane.py) +└── Task 3: Unit tests (tests/test_icp_registration.py) + +Wave 3 (After Wave 2): +└── Task 4: Integration testing & type checking (verification only) +``` + +### Dependency Matrix + +| Task | Depends On | Blocks | Can Parallelize With | +|------|------------|--------|---------------------| +| 1 | None | 2, 3 | None (foundational) | +| 2 | 1 | 4 | 3 | +| 3 | 1 | 4 | 2 | +| 4 | 2, 3 | None | None (final) | + +### Agent Dispatch Summary + +| Wave | Tasks | Recommended Agents | +|------|-------|-------------------| +| 1 | 1 | `task(category="unspecified-high")` — algorithmic Open3D work | +| 2 | 2, 3 | `task(category="quick")` for 2; `task(category="unspecified-high")` for 3 | +| 3 | 4 | `task(category="quick")` — verification only | + +--- + +## TODOs + +- [x] 1. Core ICP Registration Module (`aruco/icp_registration.py`) + + **What to do**: + Create the core ICP registration module with the following functions and dataclasses: + + **Dataclasses** (follow `aruco/ground_plane.py` pattern): + - `ICPConfig` — configuration parameters: + - `voxel_size: float = 0.02` (base voxel size in meters; multi-scale derives `[4×, 2×, 1×]`) + - `max_iterations: list[int] = [50, 30, 14]` (per scale) + - `method: str = "point_to_plane"` (or `"gicp"`) + - `band_height: float = 0.3` (near-floor band height in meters above detected floor) + - `min_fitness: float = 0.3` (minimum ICP fitness to accept pair result) + - `min_overlap_area: float = 1.0` (minimum XZ overlap area in m²) + - `overlap_margin: float = 0.5` (inflate bboxes by this margin in meters) + - `gravity_penalty_weight: float = 10.0` (soft constraint on pitch/roll deviation) + - `max_correspondence_distance_factor: float = 1.4` (multiplied by voxel_size per scale) + - `max_rotation_deg: float = 5.0` (safety bound on ICP delta) + - `max_translation_m: float = 0.1` (safety bound on ICP delta) + - `ICPResult` — per-pair result: + - `transformation: np.ndarray` (4x4) + - `fitness: float` + - `inlier_rmse: float` + - `information_matrix: np.ndarray` (6x6) + - `converged: bool` + - `ICPMetrics` — overall metrics: + - `success: bool` + - `num_pairs_attempted: int` + - `num_pairs_converged: int` + - `num_cameras_optimized: int` + - `num_disconnected: int` + - `per_pair_results: dict[tuple[str, str], ICPResult]` + - `reference_camera: str` + - `message: str` + + **Functions**: + + 1. `extract_near_floor_band(points_world, floor_y, band_height, floor_normal)` → `np.ndarray` + - Given world-frame points and detected floor parameters, filter to points within the band: + - Project each point onto floor normal direction + - Keep points where `floor_y <= projection <= floor_y + band_height` + - Returns filtered `(M, 3)` array + - This captures floor + low objects (baseboards, table legs, cables) for 3D structure + + 2. `compute_overlap_xz(points_a, points_b, margin)` → `float` + - Project both point clouds onto XZ plane (ignore Y) + - Compute axis-aligned bounding box for each + - Inflate bboxes by `margin` + - Return intersection area (m²); 0.0 if no overlap + + 3. `pairwise_icp(source_pcd, target_pcd, config, init_transform)` → `ICPResult` + - Multi-scale ICP loop (coarse→fine): + - Derive voxel sizes: `[config.voxel_size * 4, * 2, * 1]` + - For each scale: + - Downsample with `voxel_down_sample(voxel_size)` + - Estimate normals: `estimate_normals(KDTreeSearchParamHybrid(radius=voxel_size*2, max_nn=30))` + - If `config.method == "point_to_plane"`: + - `registration_icp(source_down, target_down, voxel_size * config.max_correspondence_distance_factor, current_transform, TransformationEstimationPointToPlane(), ICPConvergenceCriteria(...))` + - If `config.method == "gicp"`: + - `registration_generalized_icp(source_down, target_down, voxel_size * config.max_correspondence_distance_factor, current_transform, TransformationEstimationForGeneralizedICP(), ICPConvergenceCriteria(...))` + - Chain: `current_transform = result.transformation` + - After final scale: compute information matrix via `get_information_matrix_from_point_clouds` + - Return `ICPResult` + + 4. `apply_gravity_constraint(T_icp, T_original, penalty_weight)` → `np.ndarray` + - Purpose: preserve RANSAC gravity alignment while allowing yaw + XZ + height refinement + - Decompose rotation of `T_icp` into pitch/roll/yaw (use scipy `Rotation.from_matrix().as_euler('xyz')` or manual decomposition) + - Decompose rotation of `T_original` similarly + - Blend pitch/roll: `blended = original + (icp - original) / (1 + penalty_weight)` + - Keep ICP yaw unchanged + - Recompose rotation from blended euler angles + - Keep ICP translation (yaw + XZ + height are all allowed to change) + - Return constrained 4x4 transform + + 5. `build_pose_graph(serials, extrinsics, pair_results, reference_serial)` → `o3d.pipelines.registration.PoseGraph` + - Create one node per camera + - Set reference camera node as anchor (identity odometry edge) + - For each converged pair: add loop-closure edge with the ICP transformation and information matrix + - Detect connected components: only include cameras reachable from reference + - Log warning for any cameras not in the reference's component + + 6. `optimize_pose_graph(pose_graph)` → `o3d.pipelines.registration.PoseGraph` + - Run `o3d.pipelines.registration.global_optimization` with: + - `GlobalOptimizationLevenbergMarquardt()` + - `GlobalOptimizationConvergenceCriteria()` + - `GlobalOptimizationOption(max_correspondence_distance=..., reference_node=0)` + - Return optimized graph + + 7. `refine_with_icp(camera_data, extrinsics, floor_planes, config)` → `tuple[dict[str, np.ndarray], ICPMetrics]` + - Main orchestrator (follows `refine_ground_from_depth` pattern): + 1. For each camera that has a detected floor plane: + - Unproject depth to world points (reuse `unproject_depth_to_points` + world transform) + - Extract near-floor band using `floor_y` from detected `FloorPlane.d` and `FloorPlane.normal` + - Convert to Open3D PointCloud + 2. Detect overlapping pairs via `compute_overlap_xz` + 3. For each overlapping pair: + - Compute initial relative transform from current extrinsics + - Run `pairwise_icp` + - Apply `apply_gravity_constraint` to the result + 4. Build pose graph from converged pairs + 5. Run `optimize_pose_graph` + 6. Extract refined extrinsics from optimized graph + 7. Validate per-camera deltas against `max_rotation_deg` / `max_translation_m` (reject cameras that exceed bounds) + 8. Return (new_extrinsics, metrics) + - If no overlapping pairs found or all ICP fails: return original extrinsics with `success=False` + + **Must NOT do**: + - Do NOT modify `aruco/ground_plane.py` function signatures or dataclasses + - Do NOT add RGB/color handling + - Do NOT add dependencies not in `pyproject.toml` + - Do NOT import from `refine_ground_plane.py` (module should be self-contained) + - Do NOT write to files (the caller handles I/O) + + **Recommended Agent Profile**: + - **Category**: `unspecified-high` + - Reason: Algorithmic module with Open3D integration, multiple interacting functions, mathematical constraints (gravity regularization, pose graph). Not purely frontend or quick-fix. + - **Skills**: [] + - **Skills Evaluated but Omitted**: + - `playwright`: No browser interaction + - `frontend-ui-ux`: No UI work + - `git-master`: Commit handled at end + + **Parallelization**: + - **Can Run In Parallel**: NO (foundational — all other tasks depend on this) + - **Parallel Group**: Wave 1 (solo) + - **Blocks**: Tasks 2, 3 + - **Blocked By**: None (can start immediately) + + **References** (CRITICAL): + + **Pattern References** (existing code to follow): + - `aruco/ground_plane.py:19-68` — Dataclass pattern for Config/Metrics/Result (FloorPlane, FloorCorrection, GroundPlaneConfig, GroundPlaneMetrics). Follow same style for ICPConfig, ICPResult, ICPMetrics. + - `aruco/ground_plane.py:71-111` — `unproject_depth_to_points` implementation. Reuse this function directly (import from `aruco.ground_plane`) for point cloud generation. + - `aruco/ground_plane.py:114-157` — `detect_floor_plane` RANSAC pattern. The returned `FloorPlane.d` and `FloorPlane.normal` define where the floor is — use these to define the near-floor band. + - `aruco/ground_plane.py:358-540` — `refine_ground_from_depth` orchestrator pattern. Follow this structure for `refine_with_icp`: iterate cameras, validate, compute, apply bounds, return metrics. + - `aruco/ground_plane.py:1-16` — Import structure and type aliases (Vec3, Mat44, PointsNC). Reuse these. + - `aruco/depth_refine.py:71-227` — Optimization pattern with stats/metrics reporting and safety bounds. + + **API/Type References** (contracts to implement against): + - `aruco/ground_plane.py:10-16` — Type aliases (Vec3, Mat44, PointsNC) — reuse these + - `aruco/ground_plane.py:20-23` — `FloorPlane` dataclass — `normal` and `d` fields define the floor for band extraction + - `aruco/ground_plane.py:54-68` — `GroundPlaneMetrics` — `camera_planes: Dict[str, FloorPlane]` provides per-camera floor parameters + + **External References** (Open3D APIs to use): + - `o3d.pipelines.registration.registration_icp` — Point-to-plane ICP + - `o3d.pipelines.registration.TransformationEstimationPointToPlane` — estimation method for point-to-plane + - `o3d.pipelines.registration.registration_generalized_icp` — GICP variant + - `o3d.pipelines.registration.TransformationEstimationForGeneralizedICP` — estimation method for GICP + - `o3d.pipelines.registration.ICPConvergenceCriteria` — convergence parameters + - `o3d.pipelines.registration.PoseGraph` + `PoseGraphNode` + `PoseGraphEdge` — pose graph construction + - `o3d.pipelines.registration.global_optimization` — graph optimizer + - `o3d.pipelines.registration.GlobalOptimizationLevenbergMarquardt` — LM solver + - `o3d.pipelines.registration.GlobalOptimizationConvergenceCriteria` — optimizer convergence + - `o3d.pipelines.registration.GlobalOptimizationOption` — optimizer options (reference_node) + - `o3d.pipelines.registration.get_information_matrix_from_point_clouds` — edge weight computation + - `o3d.geometry.PointCloud` + `voxel_down_sample` + `estimate_normals` — preprocessing + - `o3d.geometry.KDTreeSearchParamHybrid` — normal estimation parameters + + **WHY Each Reference Matters**: + - `ground_plane.py` dataclasses: Follow same style so the ICP module feels native to the codebase + - `unproject_depth_to_points`: Don't reinvent — import and reuse for depth→point cloud conversion + - `FloorPlane.d` / `.normal`: These define the floor height and orientation from RANSAC — the band extraction needs these + - `refine_ground_from_depth`: The orchestrator pattern (iterate cameras, validate, compute, return metrics) should be followed exactly + - Open3D registration APIs: The core of the implementation — all ICP, pose graph, and optimization + + **Acceptance Criteria**: + + **Agent-Executed QA Scenarios (MANDATORY):** + + ``` + Scenario: Module imports without errors + Tool: Bash + Preconditions: None + Steps: + 1. uv run python -c "from aruco.icp_registration import ICPConfig, ICPResult, ICPMetrics, refine_with_icp, pairwise_icp, extract_near_floor_band, compute_overlap_xz, build_pose_graph, optimize_pose_graph, apply_gravity_constraint" + 2. Assert: exit code 0 + Expected Result: All symbols importable + Evidence: Command output + + Scenario: Type check passes on new module + Tool: Bash + Preconditions: Module file exists + Steps: + 1. uv run basedpyright aruco/icp_registration.py + 2. Assert: exit code 0, no error-level diagnostics + Expected Result: Clean type check + Evidence: basedpyright output + + Scenario: Smoke test with synthetic overlapping floor points + Tool: Bash + Preconditions: Module implemented + Steps: + 1. uv run python -c " + import numpy as np + from aruco.icp_registration import refine_with_icp, ICPConfig + from aruco.ground_plane import FloorPlane + rng = np.random.default_rng(42) + # Two cameras with overlapping floor + small box + floor1 = np.column_stack([rng.uniform(-2, 2, 500), np.zeros(500) + rng.normal(0, 0.005, 500), rng.uniform(0, 4, 500)]) + box1 = np.column_stack([rng.uniform(0, 0.5, 50), rng.uniform(0, 0.3, 50), rng.uniform(1.5, 2.0, 50)]) + pts1 = np.vstack([floor1, box1]) + floor2 = np.column_stack([rng.uniform(-1, 3, 500), np.zeros(500) + rng.normal(0, 0.005, 500), rng.uniform(1, 5, 500)]) + box2 = np.column_stack([rng.uniform(0, 0.5, 50), rng.uniform(0, 0.3, 50), rng.uniform(1.5, 2.0, 50)]) + pts2 = np.vstack([floor2, box2]) + camera_data = { + 'cam1': {'depth': np.ones((100,100)), 'K': np.eye(3)}, + 'cam2': {'depth': np.ones((100,100)), 'K': np.eye(3)}, + } + extrinsics = {'cam1': np.eye(4), 'cam2': np.eye(4)} + planes = { + 'cam1': FloorPlane(normal=np.array([0,1,0]), d=0.0), + 'cam2': FloorPlane(normal=np.array([0,1,0]), d=0.0), + } + config = ICPConfig(voxel_size=0.05) + new_ext, metrics = refine_with_icp(camera_data, extrinsics, planes, config) + print(f'success={metrics.success}, pairs={metrics.num_pairs_attempted}') + print('smoke test passed') + " + 2. Assert: exit code 0, prints "smoke test passed" + Expected Result: ICP runs without crashing on synthetic data + Evidence: stdout captured + ``` + + **Commit**: YES + - Message: `feat(aruco): add ICP registration module with pose-graph optimization` + - Files: `aruco/icp_registration.py` + - Pre-commit: `uv run basedpyright aruco/icp_registration.py` + +--- + +- [x] 2. CLI Integration (`refine_ground_plane.py`) + + **What to do**: + Add ICP refinement as an optional post-processing step in `refine_ground_plane.py`: + + 1. **Add CLI flags** (follow existing click pattern at lines 19-92): + - `--icp/--no-icp` (default: `False`) — Enable ICP refinement after ground-plane correction + - `--icp-method` (type `click.Choice(["point_to_plane", "gicp"])`, default: `"point_to_plane"`) — ICP variant + - `--icp-voxel-size` (float, default: `0.02`) — Base voxel size in meters (multi-scale derived internally as `[4×, 2×, 1×]`) + + 2. **Add parameters to `main()` function** (line ~93-107): `icp: bool`, `icp_method: str`, `icp_voxel_size: float` + + 3. **Integration point**: After step 4 (line ~183, after `refine_ground_from_depth()`) and before step 5 (Save Output Extrinsics): + ```python + # 4.5 Optional ICP Refinement + if icp: + from aruco.icp_registration import refine_with_icp, ICPConfig + icp_config = ICPConfig( + method=icp_method, + voxel_size=icp_voxel_size, + max_rotation_deg=max_rotation_deg, + max_translation_m=max_translation_m, + ) + icp_extrinsics, icp_metrics = refine_with_icp( + camera_data_for_refine, + new_extrinsics, # post-ground-plane extrinsics + metrics.camera_planes, # FloorPlane per camera from RANSAC + icp_config, + ) + if icp_metrics.success: + new_extrinsics = icp_extrinsics + logger.info(f"ICP refinement: {icp_metrics.message}") + else: + logger.warning(f"ICP refinement failed: {icp_metrics.message}. Using ground-plane-only extrinsics.") + ``` + + 4. **Add ICP metrics to output JSON** under `_meta.icp_refined` (same pattern as `_meta.ground_refined` at lines 218-238) + + **Must NOT do**: + - Do NOT change the existing ground-plane refinement logic + - Do NOT change existing CLI flag behavior or defaults + - Do NOT make ICP enabled by default + - Do NOT add more than the 3 specified flags + - Do NOT modify existing function signatures + + **Recommended Agent Profile**: + - **Category**: `quick` + - Reason: Small, well-scoped integration — adding click flags and calling an existing function. ~50-80 lines of changes. + - **Skills**: [] + + **Parallelization**: + - **Can Run In Parallel**: YES (with Task 3) + - **Parallel Group**: Wave 2 (with Task 3) + - **Blocks**: Task 4 + - **Blocked By**: Task 1 + + **References**: + + **Pattern References**: + - `refine_ground_plane.py:19-92` — Existing click options pattern. Follow exact style for new flags (decorator placement, help text style, defaults). + - `refine_ground_plane.py:93-107` — `main()` function signature. Add `icp: bool`, `icp_method: str`, `icp_voxel_size: float` parameters. + - `refine_ground_plane.py:179-183` — Integration point: after `refine_ground_from_depth` call, before save. ICP step goes here. + - `refine_ground_plane.py:218-238` — `_meta.ground_refined` output pattern. Copy this structure for `_meta.icp_refined`. + + **API References**: + - `aruco/icp_registration.py:refine_with_icp` — The main function to call (from Task 1) + - `aruco/icp_registration.py:ICPConfig` — Configuration dataclass (from Task 1) + - `aruco/ground_plane.py:GroundPlaneMetrics.camera_planes` — `Dict[str, FloorPlane]` from RANSAC, passed to ICP + + **WHY Each Reference Matters**: + - `refine_ground_plane.py` click patterns: Must match existing style exactly so CLI feels consistent + - `_meta.ground_refined`: Follow this exact pattern so output JSON is consistent + - `metrics.camera_planes`: This is the bridge — RANSAC detects floor planes, ICP uses them for band extraction + + **Acceptance Criteria**: + + **Agent-Executed QA Scenarios (MANDATORY):** + + ``` + Scenario: CLI help shows new flags + Tool: Bash + Preconditions: refine_ground_plane.py updated + Steps: + 1. uv run python refine_ground_plane.py --help + 2. Assert: exit code 0 + 3. Assert: output contains "--icp / --no-icp" + 4. Assert: output contains "--icp-method" + 5. Assert: output contains "--icp-voxel-size" + Expected Result: All three new flags visible in help + Evidence: Help output captured + + Scenario: --no-icp preserves existing behavior (no regression) + Tool: Bash + Preconditions: Test fixtures work + Steps: + 1. uv run pytest tests/test_refine_ground_cli.py -x -vv + 2. Assert: all tests pass (exit code 0) + Expected Result: Zero regressions in non-ICP path + Evidence: pytest output + + Scenario: Type check passes + Tool: Bash + Steps: + 1. uv run basedpyright refine_ground_plane.py + 2. Assert: exit code 0 + Expected Result: Clean type check + Evidence: basedpyright output + ``` + + **Commit**: YES (groups with Task 3) + - Message: `feat(cli): add --icp/--icp-method/--icp-voxel-size to refine_ground_plane.py` + - Files: `refine_ground_plane.py` + - Pre-commit: `uv run basedpyright refine_ground_plane.py && uv run pytest tests/test_refine_ground_cli.py -x` + +--- + +- [x] 3. Unit Tests (`tests/test_icp_registration.py`) + + **What to do**: + Create comprehensive unit tests following existing test patterns in `tests/test_ground_plane.py`: + + **Test functions to implement** (~19 tests): + + *Near-floor band extraction:* + 1. `test_extract_near_floor_band_basic` — synthetic points at known heights; verify only points within band are kept + 2. `test_extract_near_floor_band_empty` — no points in band → returns empty array + 3. `test_extract_near_floor_band_all_in_band` — all points within band → returns all + + *Overlap detection:* + 4. `test_compute_overlap_xz_full_overlap` — identical point clouds → overlap area ≈ full area + 5. `test_compute_overlap_xz_no_overlap` — disjoint clouds → overlap = 0.0 + 6. `test_compute_overlap_xz_partial` — known partial overlap → verify area within tolerance + 7. `test_compute_overlap_xz_with_margin` — margin inflates overlap detection + + *Pairwise ICP:* + 8. `test_pairwise_icp_identity` — source == target → transform ≈ identity, high fitness + 9. `test_pairwise_icp_known_transform_point_to_plane` — source = target transformed by known small T → recovered T within tolerance (≤2cm, ≤1°) + 10. `test_pairwise_icp_known_transform_gicp` — same as above but with method="gicp" + 11. `test_pairwise_icp_insufficient_points` — too few points → converged=False or low fitness + + *Gravity constraint:* + 12. `test_apply_gravity_constraint_preserves_yaw` — yaw component unchanged + 13. `test_apply_gravity_constraint_regularizes_pitch_roll` — pitch/roll pulled toward original values + 14. `test_apply_gravity_constraint_identity` — no pitch/roll change → output ≈ input + + *Pose graph:* + 15. `test_build_pose_graph_basic` — 3 cameras, 2 converged pairs → graph has correct nodes/edges + 16. `test_build_pose_graph_disconnected` — cameras with no overlap → warns, excludes disconnected + + *Integration (orchestrator):* + 17. `test_refine_with_icp_synthetic_offset` — 3+ cameras with known offset from ground truth → ICP reduces error + 18. `test_refine_with_icp_no_overlap` — cameras with no floor overlap → returns original extrinsics, success=False + 19. `test_refine_with_icp_single_camera` — only 1 camera → skip ICP, return original + + **Synthetic data strategy** (follow `tests/test_ground_plane.py` patterns): + - Generate 3D point clouds of "floor + small box/wall" scene for 3D structure + - Apply known rigid transforms to simulate camera viewpoints + - Add Gaussian noise (~0.005m) to simulate depth sensor noise + - Use `np.random.default_rng(seed)` for reproducibility + - Use `numpy.testing.assert_allclose` for numerical comparisons with `atol` appropriate for ICP (1-2cm) + - Use `pytest.raises` for error cases + + **Must NOT do**: + - Do NOT require real ZED data or SVO files + - Do NOT require network access + - Do NOT modify existing test files + - Do NOT use mocks for Open3D (test actual ICP convergence) + + **Recommended Agent Profile**: + - **Category**: `unspecified-high` + - Reason: Many test functions (~19), synthetic 3D data generation with noise, numerical assertions with appropriate tolerances. Needs careful attention to geometry. + - **Skills**: [] + + **Parallelization**: + - **Can Run In Parallel**: YES (with Task 2) + - **Parallel Group**: Wave 2 (with Task 2) + - **Blocks**: Task 4 + - **Blocked By**: Task 1 + + **References**: + + **Pattern References** (testing patterns to follow): + - `tests/test_ground_plane.py:18-37` — `test_unproject_depth_to_points_simple`: How to create synthetic depth map + intrinsics matrix + assert shape/values + - `tests/test_ground_plane.py:80-97` — `test_detect_floor_plane_perfect`: How to create synthetic planar point clouds with known normal/d and assert detection + - `tests/test_ground_plane.py:123-136` — `test_detect_floor_plane_with_outliers`: Adding noise/outliers pattern + - `tests/test_ground_plane.py:298-311` — `test_compute_floor_correction_identity`: Testing identity transform and small deltas + - `tests/test_ground_plane.py:495-606` — Integration test `test_refine_ground_from_depth_*`: Constructing synthetic camera_data + extrinsics dictionaries + + **API References**: + - `aruco/icp_registration.py` — All public functions and dataclasses (from Task 1) + + **WHY Each Reference Matters**: + - `test_ground_plane.py` patterns: Follow the EXACT same style (assert_allclose, synthetic data, seed-based reproducibility) so tests feel native + - Integration test patterns: Show how to construct `camera_data` and `extrinsics` dicts matching the expected format + + **Acceptance Criteria**: + + **Agent-Executed QA Scenarios (MANDATORY):** + + ``` + Scenario: All new tests pass + Tool: Bash + Preconditions: Task 1 complete (icp_registration.py exists) + Steps: + 1. uv run pytest tests/test_icp_registration.py -x -vv + 2. Assert: exit code 0 + 3. Assert: output shows ≥19 tests passed + Expected Result: All tests green + Evidence: pytest output with test names and pass counts + + Scenario: Tests complete in reasonable time + Tool: Bash + Steps: + 1. timeout 120 uv run pytest tests/test_icp_registration.py -x -vv + 2. Assert: completes within 120 seconds + Expected Result: No individual test takes more than ~30s + Evidence: pytest timing output + + Scenario: Known transform recovery within tolerance + Tool: Bash + Steps: + 1. uv run pytest tests/test_icp_registration.py -k "known_transform" -x -vv + 2. Assert: passes + Expected Result: ICP recovers known rigid transform within ≤2cm translation, ≤1° rotation + Evidence: Test output captured + ``` + + **Commit**: YES (groups with Task 2) + - Message: `test(icp): add comprehensive unit tests for ICP registration` + - Files: `tests/test_icp_registration.py` + - Pre-commit: `uv run pytest tests/test_icp_registration.py -x` + +--- + +- [x] 4. Integration Testing & Final Verification + + **What to do**: + Run the full verification suite to ensure everything works together: + + 1. Run full test suite (existing + new) to catch regressions + 2. Run basedpyright on entire project + 3. Verify CLI end-to-end with `--help` + 4. Verify `--no-icp` default preserves existing behavior + + **Must NOT do**: + - Do NOT fix issues in other tasks' code (flag them instead) + - Do NOT add new features + + **Recommended Agent Profile**: + - **Category**: `quick` + - Reason: Verification-only task — running commands and checking results. No new code. + - **Skills**: [] + + **Parallelization**: + - **Can Run In Parallel**: NO (final integration — depends on everything) + - **Parallel Group**: Wave 3 (solo) + - **Blocks**: None (final task) + - **Blocked By**: Tasks 2, 3 + + **References**: + - `pyproject.toml:41-43` — pytest configuration (testpaths, norecursedirs) + - `AGENTS.md` — build/test commands reference + + **Acceptance Criteria**: + + **Agent-Executed QA Scenarios (MANDATORY):** + + ``` + Scenario: Full test suite passes + Tool: Bash + Steps: + 1. uv run pytest -x -vv + 2. Assert: exit code 0 + 3. Assert: no failures, no errors + Expected Result: All tests green including new ICP tests + Evidence: Full pytest output + + Scenario: Full type check passes + Tool: Bash + Steps: + 1. uv run basedpyright + 2. Assert: exit code 0 + Expected Result: Zero type errors across entire project + Evidence: basedpyright output + + Scenario: CLI help is correct + Tool: Bash + Steps: + 1. uv run python refine_ground_plane.py --help + 2. Assert: output contains "--icp / --no-icp" + 3. Assert: output contains "--icp-method" + 4. Assert: output contains "--icp-voxel-size" + Expected Result: All flags documented + Evidence: Help output + ``` + + **Commit**: NO (verification only — all code committed in Tasks 1-3) + +--- + +## Commit Strategy + +| After Task | Message | Files | Verification | +|------------|---------|-------|--------------| +| 1 | `feat(aruco): add ICP registration module with pose-graph optimization` | `aruco/icp_registration.py` | `uv run basedpyright aruco/icp_registration.py` | +| 2+3 | `feat(cli): add --icp to refine_ground_plane.py` + `test(icp): add ICP unit tests` | `refine_ground_plane.py`, `tests/test_icp_registration.py` | `uv run pytest -x` | +| 4 | (no commit — verification only) | — | `uv run pytest && uv run basedpyright` | + +--- + +## Success Criteria + +### Verification Commands +```bash +uv run pytest -x -vv # All tests pass +uv run basedpyright # Zero type errors +uv run python refine_ground_plane.py --help # Shows new flags +uv run pytest tests/test_icp_registration.py -x -vv # ICP tests specifically +``` + +### Final Checklist +- [x] All "Must Have" items present and functional +- [x] All "Must NOT Have" items confirmed absent +- [x] All existing tests still pass (no regressions) +- [x] New ICP tests (≥19) all pass +- [x] basedpyright clean +- [x] CLI flags documented in help output +- [x] `--no-icp` produces identical behavior to current code diff --git a/py_workspace/aruco/icp_registration.py b/py_workspace/aruco/icp_registration.py index 405adc1..91b80d5 100644 --- a/py_workspace/aruco/icp_registration.py +++ b/py_workspace/aruco/icp_registration.py @@ -245,11 +245,10 @@ def build_pose_graph( extrinsics: Dict[str, Mat44], pair_results: Dict[Tuple[str, str], ICPResult], reference_serial: str, -) -> Tuple[o3d.pipelines.registration.PoseGraph, List[str]]: +) -> o3d.pipelines.registration.PoseGraph: """ Build a PoseGraph from pairwise results. Only includes cameras reachable from the reference camera. - Returns (pose_graph, optimized_serials). """ # 1. Detect connected component from reference connected = {reference_serial} @@ -273,6 +272,13 @@ def build_pose_graph( ) serial_to_idx = {s: i for i, s in enumerate(optimized_serials)} + # Log disconnected cameras + disconnected = set(serials) - connected + if disconnected: + logger.warning( + f"Cameras disconnected from reference {reference_serial}: {disconnected}" + ) + pose_graph = o3d.pipelines.registration.PoseGraph() for serial in optimized_serials: T_wc = extrinsics[serial] @@ -295,7 +301,7 @@ def build_pose_graph( ) pose_graph.edges.append(edge) - return pose_graph, optimized_serials + return pose_graph def optimize_pose_graph( @@ -415,7 +421,7 @@ def refine_with_icp( return extrinsics, metrics # 3. Pose Graph - pose_graph, optimized_serials = build_pose_graph( + pose_graph = build_pose_graph( valid_serials, extrinsics, pair_results, metrics.reference_camera ) @@ -425,13 +431,26 @@ def refine_with_icp( # 5. Extract and Validate new_extrinsics = extrinsics.copy() - metrics.num_disconnected = len(valid_serials) - len(optimized_serials) - if metrics.num_disconnected > 0: - disconnected_serials = set(valid_serials) - set(optimized_serials) - logger.warning( - f"Cameras disconnected from reference {metrics.reference_camera}: {disconnected_serials}" - ) + # Re-derive optimized_serials to match build_pose_graph logic for node-to-serial mapping + connected = {metrics.reference_camera} + queue = [metrics.reference_camera] + while queue: + curr = queue.pop(0) + for (s1, s2), result in pair_results.items(): + if not result.converged: + continue + if s1 == curr and s2 not in connected: + connected.add(s2) + queue.append(s2) + elif s2 == curr and s1 not in connected: + connected.add(s1) + queue.append(s1) + optimized_serials = [metrics.reference_camera] + sorted( + list(connected - {metrics.reference_camera}) + ) + + metrics.num_disconnected = len(valid_serials) - len(optimized_serials) metrics.num_cameras_optimized = 0 for i, serial in enumerate(optimized_serials): @@ -457,8 +476,3 @@ def refine_with_icp( metrics.message = f"Optimized {metrics.num_cameras_optimized} cameras" return new_extrinsics, metrics - - metrics.success = metrics.num_cameras_optimized > 1 - metrics.message = f"Optimized {metrics.num_cameras_optimized} cameras" - - return new_extrinsics, metrics diff --git a/py_workspace/tests/test_icp_registration.py b/py_workspace/tests/test_icp_registration.py new file mode 100644 index 0000000..d25928e --- /dev/null +++ b/py_workspace/tests/test_icp_registration.py @@ -0,0 +1,312 @@ +import numpy as np +import pytest +import open3d as o3d +from scipy.spatial.transform import Rotation +from aruco.icp_registration import ( + ICPConfig, + ICPResult, + ICPMetrics, + extract_near_floor_band, + compute_overlap_xz, + apply_gravity_constraint, + pairwise_icp, + build_pose_graph, + refine_with_icp, +) +from aruco.ground_plane import FloorPlane + + +def test_extract_near_floor_band_basic(): + points = np.array( + [[0, -0.1, 0], [0, 0.1, 0], [0, 0.2, 0], [0, 0.4, 0]], dtype=np.float64 + ) + + floor_y = 0.0 + band_height = 0.3 + floor_normal = np.array([0, 1, 0], dtype=np.float64) + + result = extract_near_floor_band(points, floor_y, band_height, floor_normal) + assert len(result) == 2 + assert np.all(result[:, 1] >= 0) + assert np.all(result[:, 1] <= 0.3) + + +def test_extract_near_floor_band_empty(): + points = np.zeros((0, 3)) + result = extract_near_floor_band(points, 0.0, 0.3, np.array([0, 1, 0])) + assert len(result) == 0 + + +def test_extract_near_floor_band_all_in(): + points = np.random.uniform(0, 0.2, (100, 3)) + points[:, 1] = np.random.uniform(0.05, 0.25, 100) + result = extract_near_floor_band(points, 0.0, 0.3, np.array([0, 1, 0])) + assert len(result) == 100 + + +def test_compute_overlap_xz_full(): + points_a = np.array([[0, 0, 0], [1, 0, 1]]) + points_b = np.array([[0, 0, 0], [1, 0, 1]]) + area = compute_overlap_xz(points_a, points_b) + assert abs(area - 1.0) < 1e-6 + + +def test_compute_overlap_xz_no(): + points_a = np.array([[0, 0, 0], [1, 0, 1]]) + points_b = np.array([[2, 0, 2], [3, 0, 3]]) + area = compute_overlap_xz(points_a, points_b) + assert area == 0.0 + + +def test_compute_overlap_xz_partial(): + points_a = np.array([[0, 0, 0], [1, 0, 1]]) + points_b = np.array([[0.5, 0, 0.5], [1.5, 0, 1.5]]) + area = compute_overlap_xz(points_a, points_b) + assert abs(area - 0.25) < 1e-6 + + +def test_compute_overlap_xz_with_margin(): + points_a = np.array([[0, 0, 0], [1, 0, 1]]) + points_b = np.array([[1.2, 0, 0], [2.2, 0, 1]]) + area_no_margin = compute_overlap_xz(points_a, points_b) + area_with_margin = compute_overlap_xz(points_a, points_b, margin=0.5) + assert area_no_margin == 0.0 + assert area_with_margin > 0.0 + + +def test_apply_gravity_constraint_identity(): + T = np.eye(4) + result = apply_gravity_constraint(T, T) + np.testing.assert_allclose(result, T, atol=1e-6) + + +def test_apply_gravity_constraint_preserves_yaw(): + T_orig = np.eye(4) + R_icp = Rotation.from_euler("xyz", [0, 10, 0], degrees=True).as_matrix() + T_icp = np.eye(4) + T_icp[:3, :3] = R_icp + + result = apply_gravity_constraint(T_icp, T_orig, penalty_weight=10.0) + + res_euler = Rotation.from_matrix(result[:3, :3]).as_euler("xyz", degrees=True) + assert abs(res_euler[1] - 10.0) < 1e-6 + assert abs(res_euler[0]) < 1e-6 + assert abs(res_euler[2]) < 1e-6 + + +def test_apply_gravity_constraint_regularizes_pitch_roll(): + T_orig = np.eye(4) + R_icp = Rotation.from_euler("xyz", [10, 0, 10], degrees=True).as_matrix() + T_icp = np.eye(4) + T_icp[:3, :3] = R_icp + + result = apply_gravity_constraint(T_icp, T_orig, penalty_weight=9.0) + + res_euler = Rotation.from_matrix(result[:3, :3]).as_euler("xyz", degrees=True) + assert abs(res_euler[0] - 1.0) < 1e-2 + assert abs(res_euler[2] - 1.0) < 1e-2 + + +def create_box_pcd(size=1.0, num_points=500, seed=42): + rng = np.random.default_rng(seed) + points = rng.uniform(0, size, (num_points, 3)) + pcd = o3d.geometry.PointCloud() + pcd.points = o3d.utility.Vector3dVector(points) + return pcd + + +def test_pairwise_icp_identity(): + pcd = create_box_pcd() + config = ICPConfig(min_fitness=0.1) + result = pairwise_icp(pcd, pcd, config, np.eye(4)) + + assert result.converged + assert result.fitness > 0.9 + np.testing.assert_allclose(result.transformation, np.eye(4), atol=1e-3) + + +def test_pairwise_icp_known_transform(): + source = create_box_pcd() + T_true = np.eye(4) + T_true[:3, :3] = Rotation.from_euler("y", 5, degrees=True).as_matrix() + T_true[:3, 3] = [0.05, 0, 0.02] + + target = o3d.geometry.PointCloud() + target.points = o3d.utility.Vector3dVector( + (np.asarray(source.points) @ T_true[:3, :3].T) + T_true[:3, 3] + ) + + config = ICPConfig(min_fitness=0.5, voxel_size=0.01) + result = pairwise_icp(source, target, config, np.eye(4)) + + assert result.converged + np.testing.assert_allclose(result.transformation, T_true, atol=1e-2) + + +def test_pairwise_icp_known_transform_gicp(): + source = create_box_pcd() + T_true = np.eye(4) + T_true[:3, :3] = Rotation.from_euler("y", 5, degrees=True).as_matrix() + T_true[:3, 3] = [0.05, 0, 0.02] + + target = o3d.geometry.PointCloud() + target.points = o3d.utility.Vector3dVector( + (np.asarray(source.points) @ T_true[:3, :3].T) + T_true[:3, 3] + ) + + # GICP usually needs normals, which pairwise_icp estimates internally + config = ICPConfig(min_fitness=0.5, voxel_size=0.01, method="gicp") + result = pairwise_icp(source, target, config, np.eye(4)) + + assert result.converged + np.testing.assert_allclose(result.transformation, T_true, atol=1e-2) + + +def test_pairwise_icp_insufficient_points(): + source = o3d.geometry.PointCloud() + source.points = o3d.utility.Vector3dVector(np.random.rand(5, 3)) + target = o3d.geometry.PointCloud() + target.points = o3d.utility.Vector3dVector(np.random.rand(5, 3)) + + config = ICPConfig(min_fitness=0.9) + result = pairwise_icp(source, target, config, np.eye(4)) + assert not result.converged + + +def test_build_pose_graph_basic(): + serials = ["cam1", "cam2"] + extrinsics = {"cam1": np.eye(4), "cam2": np.eye(4)} + + res = ICPResult( + transformation=np.eye(4), + fitness=1.0, + inlier_rmse=0.0, + information_matrix=np.eye(6), + converged=True, + ) + pair_results = {("cam1", "cam2"): res} + + graph = build_pose_graph(serials, extrinsics, pair_results, "cam1") + assert len(graph.nodes) == 2 + assert len(graph.edges) == 1 + + +def test_build_pose_graph_disconnected(): + serials = ["cam1", "cam2", "cam3"] + extrinsics = {"cam1": np.eye(4), "cam2": np.eye(4), "cam3": np.eye(4)} + + res = ICPResult( + transformation=np.eye(4), + fitness=1.0, + inlier_rmse=0.0, + information_matrix=np.eye(6), + converged=True, + ) + pair_results = {("cam1", "cam2"): res} + + graph = build_pose_graph(serials, extrinsics, pair_results, "cam1") + assert len(graph.nodes) == 2 + assert len(graph.edges) == 1 + + +def test_refine_with_icp_synthetic_offset(): + import aruco.icp_registration + import aruco.ground_plane + + box_points = create_box_pcd(size=0.5).points + box_points = np.asarray(box_points) + box_points[:, 1] -= 1.0 + box_points[:, 2] += 2.0 + + def mock_unproject(depth, K, stride=1, **kwargs): + if depth[0, 0] == 1.0: + return box_points + else: + return box_points - np.array([1.0, 0, 0]) + + orig_unproject = aruco.ground_plane.unproject_depth_to_points + aruco.ground_plane.unproject_depth_to_points = mock_unproject + + try: + camera_data = { + "cam1": {"depth": np.ones((10, 10)), "K": np.eye(3)}, + "cam2": {"depth": np.zeros((10, 10)), "K": np.eye(3)}, + } + T_w1 = np.eye(4) + T_w2_est = np.eye(4) + T_w2_est[0, 3] = 1.05 + + extrinsics = {"cam1": T_w1, "cam2": T_w2_est} + + floor_planes = { + "cam1": FloorPlane(normal=np.array([0, 1, 0]), d=1.0), + "cam2": FloorPlane(normal=np.array([0, 1, 0]), d=1.0), + } + + config = ICPConfig( + min_overlap_area=0.01, + min_fitness=0.1, + voxel_size=0.05, + max_iterations=[20, 10, 5], + max_translation_m=3.0, + ) + + new_extrinsics, metrics = refine_with_icp( + camera_data, extrinsics, floor_planes, config + ) + + assert metrics.success + assert metrics.num_cameras_optimized == 2 + assert abs(new_extrinsics["cam2"][0, 3] - T_w2_est[0, 3]) > 0.01 + + finally: + aruco.ground_plane.unproject_depth_to_points = orig_unproject + + +def test_refine_with_icp_no_overlap(): + import aruco.icp_registration + import aruco.ground_plane + + def mock_unproject(depth, K, stride=1, **kwargs): + if depth[0, 0] == 1.0: + return np.random.rand(200, 3) + [0, -1, 0] + else: + return np.random.rand(200, 3) + [10, -1, 0] + + orig_unproject = aruco.ground_plane.unproject_depth_to_points + aruco.ground_plane.unproject_depth_to_points = mock_unproject + + try: + camera_data = { + "cam1": {"depth": np.ones((10, 10)), "K": np.eye(3)}, + "cam2": {"depth": np.zeros((10, 10)), "K": np.eye(3)}, + } + extrinsics = {"cam1": np.eye(4), "cam2": np.eye(4)} + floor_planes = { + "cam1": FloorPlane(normal=np.array([0, 1, 0]), d=1.0), + "cam2": FloorPlane(normal=np.array([0, 1, 0]), d=1.0), + } + + config = ICPConfig(min_overlap_area=1.0) + new_extrinsics, metrics = refine_with_icp( + camera_data, extrinsics, floor_planes, config + ) + + assert not metrics.success + assert "No converged ICP pairs" in metrics.message + + finally: + aruco.ground_plane.unproject_depth_to_points = orig_unproject + + +def test_refine_with_icp_single_camera(): + camera_data = {"cam1": {"depth": np.ones((10, 10)), "K": np.eye(3)}} + extrinsics = {"cam1": np.eye(4)} + floor_planes = {"cam1": FloorPlane(normal=np.array([0, 1, 0]), d=1.0)} + + config = ICPConfig() + new_extrinsics, metrics = refine_with_icp( + camera_data, extrinsics, floor_planes, config + ) + + assert not metrics.success