diff --git a/py_workspace/aruco/ground_plane.py b/py_workspace/aruco/ground_plane.py index ef35040..32a274e 100644 --- a/py_workspace/aruco/ground_plane.py +++ b/py_workspace/aruco/ground_plane.py @@ -1,9 +1,9 @@ import numpy as np -from typing import Optional, Tuple, List +from typing import Optional, Tuple, List, Dict, Any from jaxtyping import Float from typing import TYPE_CHECKING import open3d as o3d -from dataclasses import dataclass +from dataclasses import dataclass, field if TYPE_CHECKING: Vec3 = Float[np.ndarray, "3"] @@ -29,6 +29,36 @@ class FloorCorrection: reason: str = "" +@dataclass +class GroundPlaneConfig: + enabled: bool = True + target_y: float = 0.0 + stride: int = 8 + depth_min: float = 0.2 + depth_max: float = 5.0 + ransac_dist_thresh: float = 0.02 + ransac_n: int = 3 + ransac_iters: int = 1000 + max_rotation_deg: float = 5.0 + max_translation_m: float = 0.1 + min_inliers: int = 500 + min_valid_cameras: int = 2 + + +@dataclass +class GroundPlaneMetrics: + success: bool = False + correction_applied: bool = False + num_cameras_total: int = 0 + num_cameras_valid: int = 0 + correction_transform: Mat44 = field(default_factory=lambda: np.eye(4)) + rotation_deg: float = 0.0 + translation_m: float = 0.0 + camera_planes: Dict[str, FloorPlane] = field(default_factory=dict) + consensus_plane: Optional[FloorPlane] = None + message: str = "" + + def unproject_depth_to_points( depth_map: np.ndarray, K: np.ndarray, @@ -245,3 +275,129 @@ def compute_floor_correction( T[:3, 3] = target_normal * t_y return FloorCorrection(transform=T.astype(np.float64), valid=True) + + +def refine_ground_from_depth( + camera_data: Dict[str, Dict[str, Any]], + extrinsics: Dict[str, Mat44], + config: GroundPlaneConfig = GroundPlaneConfig(), +) -> Tuple[Dict[str, Mat44], GroundPlaneMetrics]: + """ + Orchestrate ground plane refinement across multiple cameras. + + Args: + camera_data: Dict mapping serial -> {'depth': np.ndarray, 'K': np.ndarray} + extrinsics: Dict mapping serial -> world_from_cam matrix (4x4) + config: Configuration parameters + + Returns: + Tuple of (new_extrinsics, metrics) + """ + metrics = GroundPlaneMetrics() + metrics.num_cameras_total = len(camera_data) + + if not config.enabled: + metrics.message = "Ground plane refinement disabled in config" + return extrinsics, metrics + + valid_planes: List[FloorPlane] = [] + valid_serials: List[str] = [] + + # 1. Detect planes in each camera + for serial, data in camera_data.items(): + if serial not in extrinsics: + continue + + depth_map = data.get("depth") + K = data.get("K") + + if depth_map is None or K is None: + continue + + # Unproject to camera frame + points_cam = unproject_depth_to_points( + depth_map, + K, + stride=config.stride, + depth_min=config.depth_min, + depth_max=config.depth_max, + ) + + if len(points_cam) < config.min_inliers: + continue + + # Transform to world frame + T_world_cam = extrinsics[serial] + # points_cam is (N, 3) + # Apply rotation and translation + R = T_world_cam[:3, :3] + t = T_world_cam[:3, 3] + points_world = (points_cam @ R.T) + t + + # Detect plane + plane = detect_floor_plane( + points_world, + distance_threshold=config.ransac_dist_thresh, + ransac_n=config.ransac_n, + num_iterations=config.ransac_iters, + ) + + if plane is not None and plane.num_inliers >= config.min_inliers: + metrics.camera_planes[serial] = plane + valid_planes.append(plane) + valid_serials.append(serial) + + metrics.num_cameras_valid = len(valid_planes) + + # 2. Check minimum requirements + if len(valid_planes) < config.min_valid_cameras: + metrics.message = f"Found {len(valid_planes)} valid planes, required {config.min_valid_cameras}" + return extrinsics, metrics + + # 3. Compute consensus + try: + consensus = compute_consensus_plane(valid_planes) + metrics.consensus_plane = consensus + except ValueError as e: + metrics.message = f"Consensus computation failed: {e}" + return extrinsics, metrics + + # 4. Compute correction + correction = compute_floor_correction( + consensus, + target_floor_y=config.target_y, + max_rotation_deg=config.max_rotation_deg, + max_translation_m=config.max_translation_m, + ) + + metrics.correction_transform = correction.transform + + if not correction.valid: + metrics.message = f"Correction invalid: {correction.reason}" + return extrinsics, metrics + + # 5. Apply correction + # T_corr is the transform that moves the world frame. + # New world points P' = T_corr * P + # We want new extrinsics T'_world_cam such that P' = T'_world_cam * P_cam + # T'_world_cam * P_cam = T_corr * (T_world_cam * P_cam) + # So T'_world_cam = T_corr * T_world_cam + + new_extrinsics = {} + T_corr = correction.transform + + for serial, T_old in extrinsics.items(): + new_extrinsics[serial] = T_corr @ T_old + + # Calculate metrics + # Rotation angle of T_corr + trace = np.trace(T_corr[:3, :3]) + cos_angle = np.clip((trace - 1) / 2, -1.0, 1.0) + metrics.rotation_deg = float(np.rad2deg(np.arccos(cos_angle))) + metrics.translation_m = float(np.linalg.norm(T_corr[:3, 3])) + + metrics.success = True + metrics.correction_applied = True + metrics.message = "Success" + + return new_extrinsics, metrics diff --git a/py_workspace/tests/test_ground_plane.py b/py_workspace/tests/test_ground_plane.py index 9d21beb..0f30a49 100644 --- a/py_workspace/tests/test_ground_plane.py +++ b/py_workspace/tests/test_ground_plane.py @@ -5,8 +5,11 @@ from aruco.ground_plane import ( detect_floor_plane, compute_consensus_plane, compute_floor_correction, + refine_ground_from_depth, FloorPlane, FloorCorrection, + GroundPlaneConfig, + GroundPlaneMetrics, ) @@ -315,3 +318,160 @@ def test_compute_floor_correction_bounds(): assert not result.valid assert "exceeds limit" in result.reason + + +def test_refine_ground_from_depth_disabled(): + config = GroundPlaneConfig(enabled=False) + extrinsics = {"cam1": np.eye(4)} + camera_data = {"cam1": {"depth": np.zeros((10, 10)), "K": np.eye(3)}} + + new_extrinsics, metrics = refine_ground_from_depth(camera_data, extrinsics, config) + + assert not metrics.success + assert "disabled" in metrics.message + assert new_extrinsics == extrinsics + + +def test_refine_ground_from_depth_insufficient_cameras(): + # Only 1 camera, need 2 + config = GroundPlaneConfig(min_valid_cameras=2, min_inliers=10) + + # Create fake depth map that produces a plane + # Plane at y=-1.0 + width, height = 20, 20 + K = np.eye(3) + K[0, 2] = 10 + K[1, 2] = 10 + K[0, 0] = 20 + K[1, 1] = 20 + + # Generate points on plane y=-1.0 + # In camera frame (assuming cam at origin looking -Z), floor is at Y=-1.0 + # But wait, standard camera frame is Y-down. + # Let's assume world frame is Y-up. + # If cam is at origin, and looking down -Z (OpenGL) or +Z (OpenCV). + # Let's use identity extrinsics -> cam frame = world frame. + # World frame Y-up. + # So we want points with y=-1.0. + # But unproject_depth gives points in camera frame. + # If we want world y=-1.0, and T=I, then cam y=-1.0. + # But unproject uses OpenCV convention: Y-down. + # So y=-1.0 means 1m UP in camera frame. + # Let's just make points that form A plane, doesn't matter which one, + # as long as it's detected. + + # Let's make a flat plane at Z=2.0 (fronto-parallel) + depth_map = np.full((height, width), 2.0, dtype=np.float32) + + # Need to ensure we have enough points for RANSAC + # 20x20 = 400 points. + # Stride default is 8. 20/8 = 2. 2x2 = 4 points. + # RANSAC n=3. So 4 points is enough. + # But min_inliers=10. 4 < 10. + # So we need to reduce stride or increase size. + config.stride = 1 + + camera_data = {"cam1": {"depth": depth_map, "K": K}} + extrinsics = {"cam1": np.eye(4)} + + new_extrinsics, metrics = refine_ground_from_depth(camera_data, extrinsics, config) + + assert not metrics.success + assert "Found 1 valid planes" in metrics.message + assert metrics.num_cameras_valid == 1 + + +def test_refine_ground_from_depth_success(): + # 2 cameras, both seeing floor at y=-1.0 + # We want to correct it to y=0.0 + config = GroundPlaneConfig( + min_valid_cameras=2, + min_inliers=10, + target_y=0.0, + max_translation_m=2.0, + ransac_dist_thresh=0.05, + ) + + width, height = 20, 20 + K = np.eye(3) + K[0, 2] = 10 + K[1, 2] = 10 + K[0, 0] = 20 + K[1, 1] = 20 + + # Create points on plane y=-1.0 in WORLD frame + # Cam 1 at origin. T_world_cam = I. + # So points in cam 1 should be at y=-1.0. + # OpenCV cam: Y-down. So y=-1.0 is UP. + # Let's just use the fact that we transform points to world frame before detection. + # So if we make depth map such that unprojected points + extrinsics -> plane y=-1.0. + + # Let's manually mock the detection to avoid complex depth map math + # We can't easily mock internal functions without monkeypatching. + # Instead, let's construct a depth map that corresponds to a plane. + # Simplest: Camera looking down at floor. + # Cam at (0, 2, 0) looking at (0, 0, 0). + # World floor at y=0. + # Cam floor distance = 2.0. + # But here we want to simulate a MISALIGNED floor. + # Say we think floor is at y=-1.0 (in our current world frame). + # So we generate points at y=-1.0. + + # Let's try a simpler approach: + # Create depth map for a plane Z=2.0 in camera frame. + # Set extrinsics such that this plane becomes Y=-1.0 in world frame. + # Plane Z=2.0 in cam frame: (x, y, 2). + # We want R * (x, y, 2) + t = (X, -1, Z). + # Let R = Rotation(-90 deg around X). + # R = [[1, 0, 0], [0, 0, 1], [0, -1, 0]] + # R * (x, y, 2) = (x, 2, -y). + # We want Y_world = -1. + # So 2 + ty = -1 => ty = -3. + # So if we put cam at y=-3, rotated -90X. + # Then Z=2 plane becomes Y=-1 plane. + + # Rotation -90 deg around X + Rx_neg90 = np.array([[1, 0, 0], [0, 0, 1], [0, -1, 0]]) + t = np.array([0, -3, 0]) + T_world_cam = np.eye(4) + T_world_cam[:3, :3] = Rx_neg90 + T_world_cam[:3, 3] = t + + # Depth map: constant 2.0 + depth_map = np.full((height, width), 2.0, dtype=np.float32) + + # Need to ensure we have enough points for RANSAC + # 20x20 = 400 points. + # Stride default is 8. 20/8 = 2. 2x2 = 4 points. + # RANSAC n=3. So 4 points is enough. + # But min_inliers=10. 4 < 10. + # So we need to reduce stride or increase size. + config.stride = 1 + + camera_data = { + "cam1": {"depth": depth_map, "K": K}, + "cam2": {"depth": depth_map, "K": K}, + } + extrinsics = { + "cam1": T_world_cam, + "cam2": T_world_cam, + } + + new_extrinsics, metrics = refine_ground_from_depth(camera_data, extrinsics, config) + + assert metrics.success + assert metrics.num_cameras_valid == 2 + assert metrics.correction_applied + + # We started with floor at y=-1.0. Target is y=0.0. + # So we expect translation of +1.0 in Y. + # T_corr should have ty approx 1.0. + T_corr = metrics.correction_transform + assert abs(T_corr[1, 3] - 1.0) < 0.1 # Allow some slack for RANSAC noise + + # Check new extrinsics + # New T = T_corr @ Old T + # Old T origin y = -3. + # New T origin y should be -3 + 1 = -2. + T_new = new_extrinsics["cam1"] + assert abs(T_new[1, 3] - (-2.0)) < 0.1