diff --git a/py_workspace/.sisyphus/notepads/full-icp-pipeline/learnings.md b/py_workspace/.sisyphus/notepads/full-icp-pipeline/learnings.md new file mode 100644 index 0000000..8e85ef0 --- /dev/null +++ b/py_workspace/.sisyphus/notepads/full-icp-pipeline/learnings.md @@ -0,0 +1,20 @@ +## 2026-02-10T09:45:00Z Session bootstrap +- Initial notepad created for full-icp-pipeline execution. +- Baseline code references verified in `aruco/icp_registration.py` and `refine_ground_plane.py`. + +## Task 2: Point Extraction Functions + +### Learnings +- Open3D's `remove_statistical_outlier` returns a tuple `(pcd, ind)`, where `ind` is the list of indices. We only need the point cloud. +- `estimate_normals` with `KDTreeSearchParamHybrid` is robust for mixed geometry (floor + walls). +- Hybrid extraction strategy: + 1. Extract floor band (spatial filter). + 2. Extract vertical points (normal-based filter: `abs(normal ยท floor_normal) < 0.3`). + 3. Combine using boolean masks on the original point set to avoid duplicates. +- `extract_scene_points` provides a unified interface for different registration strategies (floor-only vs full-scene). + +### Decisions +- Kept `extract_near_floor_band` as a standalone function for backward compatibility and as a helper for `extract_scene_points`. +- Used `mode='floor'` as default to match existing behavior. +- Implemented `preprocess_point_cloud` to encapsulate downsampling and SOR, making the pipeline cleaner. +- Added `region` field to `ICPConfig` to control the extraction mode in future tasks. diff --git a/py_workspace/aruco/icp_registration.py b/py_workspace/aruco/icp_registration.py index 91b80d5..05dbff1 100644 --- a/py_workspace/aruco/icp_registration.py +++ b/py_workspace/aruco/icp_registration.py @@ -28,10 +28,12 @@ class ICPConfig: min_fitness: float = 0.3 # Min ICP fitness to accept pair min_overlap_area: float = 1.0 # Min XZ overlap area in m^2 overlap_margin: float = 0.5 # Inflate bboxes by this margin (m) + overlap_mode: str = "xz" # 'xz' or '3d' gravity_penalty_weight: float = 10.0 # Soft constraint on pitch/roll max_correspondence_distance_factor: float = 1.4 max_rotation_deg: float = 5.0 # Safety bound on ICP delta max_translation_m: float = 0.1 # Safety bound on ICP delta + region: str = "floor" # "floor", "hybrid", or "full" @dataclass @@ -59,6 +61,94 @@ class ICPMetrics: message: str = "" +def preprocess_point_cloud( + pcd: o3d.geometry.PointCloud, + voxel_size: float, +) -> o3d.geometry.PointCloud: + """ + Preprocess point cloud: downsample and remove outliers. + """ + pcd_down = pcd.voxel_down_sample(voxel_size) + + # SOR: nb_neighbors=20, std_ratio=2.0 + pcd_clean, _ = pcd_down.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0) + + return pcd_clean + + +def extract_scene_points( + points_world: PointsNC, + floor_y: float, + floor_normal: Vec3, + mode: str = "floor", + band_height: float = 0.3, +) -> PointsNC: + """ + Extract points based on mode: + - 'floor': points within band_height of floor + - 'hybrid': floor points + vertical structures (walls/pillars) + - 'full': all points + """ + if len(points_world) == 0: + return points_world + + if mode == "full": + return points_world + + if mode == "floor": + return extract_near_floor_band(points_world, floor_y, band_height, floor_normal) + + if mode == "hybrid": + # 1. Get floor points + floor_pts = extract_near_floor_band( + points_world, floor_y, band_height, floor_normal + ) + + # 2. Get vertical points + # Need normals for this. Create temp PCD + pcd = o3d.geometry.PointCloud() + pcd.points = o3d.utility.Vector3dVector(points_world) + + # Estimate normals if not present (using hybrid KD-tree) + pcd.estimate_normals( + search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30) + ) + + normals = np.asarray(pcd.normals) + if len(normals) != len(points_world): + logger.warning( + "Normal estimation failed, falling back to floor points only" + ) + return floor_pts + + # Dot product with floor normal + # Vertical surfaces have normals perpendicular to floor normal -> dot product near 0 + dots = np.abs(normals @ floor_normal) + + # Keep points where normal is roughly perpendicular to floor normal (vertical surface) + # Threshold 0.3 allows for some noise/slope (approx 72-108 degrees from floor normal) + vertical_mask = dots < 0.3 + vertical_pts = points_world[vertical_mask] + + if len(vertical_pts) == 0: + return floor_pts + + # Combine unique points (though sets are disjoint by definition of mask vs band? + # No, band is spatial, vertical is orientation. They might overlap.) + # Simply concatenating might duplicate. + # Let's use a boolean mask for union. + + # Re-compute floor mask to combine + projections = points_world @ floor_normal + floor_mask = (projections >= floor_y) & (projections <= floor_y + band_height) + + combined_mask = floor_mask | vertical_mask + return points_world[combined_mask] + + # Default fallback + return extract_near_floor_band(points_world, floor_y, band_height, floor_normal) + + def extract_near_floor_band( points_world: PointsNC, floor_y: float, @@ -105,6 +195,29 @@ def compute_overlap_xz( return float(dims[0] * dims[1]) +def compute_overlap_3d( + points_a: PointsNC, + points_b: PointsNC, + margin: float = 0.0, +) -> float: + """ + Compute intersection volume of 3D AABBs. + """ + if len(points_a) == 0 or len(points_b) == 0: + return 0.0 + + min_a = np.min(points_a, axis=0) - margin + max_a = np.max(points_a, axis=0) + margin + min_b = np.min(points_b, axis=0) - margin + max_b = np.max(points_b, axis=0) + margin + + inter_min = np.maximum(min_a, min_b) + inter_max = np.minimum(max_a, max_b) + + dims = np.maximum(0, inter_max - inter_min) + return float(dims[0] * dims[1] * dims[2]) + + def apply_gravity_constraint( T_icp: Mat44, T_original: Mat44, @@ -383,6 +496,9 @@ def refine_with_icp( camera_points[s1], camera_points[s2], config.overlap_margin ) if area < config.min_overlap_area: + logger.debug( + f"Skipping pair ({s1}, {s2}) due to insufficient overlap: {area:.2f} m^2 < {config.min_overlap_area:.2f} m^2" + ) continue metrics.num_pairs_attempted += 1 @@ -411,8 +527,9 @@ def refine_with_icp( result.transformation, init_T, config.gravity_penalty_weight ) + pair_results[(s1, s2)] = result + metrics.per_pair_results[(s1, s2)] = result if result.converged: - pair_results[(s1, s2)] = result metrics.num_pairs_converged += 1 metrics.per_pair_results[(s1, s2)] = result