import numpy as np import pytest import open3d as o3d import aruco.icp_registration as icp_reg from aruco.icp_registration import ( ICPConfig, estimate_depth_biases, refine_with_icp, ) from aruco.ground_plane import FloorPlane # Helper to create simple camera data def create_camera_data(depth_val=2.0, shape=(20, 20)): K = np.eye(3) K[0, 2] = shape[1] / 2 K[1, 2] = shape[0] / 2 K[0, 0] = 500.0 # fx K[1, 1] = 500.0 # fy depth = np.full(shape, depth_val, dtype=np.float32) return {"depth": depth, "K": K} @pytest.fixture def mock_preprocessing(monkeypatch): """Disable point cloud preprocessing (downsample/SOR) to keep synthetic points exact.""" def mock_preprocess(pcd, voxel_size): return pcd monkeypatch.setattr(icp_reg, "preprocess_point_cloud", mock_preprocess) @pytest.fixture def mock_scene_extraction(monkeypatch): """Pass through all points regardless of region config.""" def mock_extract(points, *args, **kwargs): return points monkeypatch.setattr(icp_reg, "extract_scene_points", mock_extract) def test_estimate_depth_biases_two_cameras_known_offset( mock_preprocessing, mock_scene_extraction, monkeypatch ): # Setup: Cam1 (Ref) at origin. Cam2 at origin (co-located for simplicity). # Cam1 sees wall at 2.0m. Cam2 sees wall at 2.1m (0.1m bias). # Rays are identical. import aruco.ground_plane # Use enough points to pass the >100 check # Using random points introduces some noise in KDTree lookup if density is low # but we relax tolerance to handle it. rng = np.random.default_rng(42) base_points = rng.uniform(-1, 1, (400, 3)) base_points[:, 2] = 2.0 # Wall at Z=2 def mock_unproject(depth, K, stride=1, **kwargs): d = depth[0, 0] if abs(d - 2.0) < 1e-3: return base_points elif abs(d - 2.1) < 1e-3: # Shift along ray. Ray is just the point vector (since cam at 0,0,0) norms = np.linalg.norm(base_points, axis=1, keepdims=True) rays = base_points / norms return base_points + rays * 0.1 return base_points monkeypatch.setattr(aruco.ground_plane, "unproject_depth_to_points", mock_unproject) camera_data = { "cam1": create_camera_data(2.0), "cam2": create_camera_data(2.1), } extrinsics = { "cam1": np.eye(4), "cam2": np.eye(4), } floor_planes = { "cam1": FloorPlane(normal=np.array([0, 1, 0]), d=0.0), "cam2": FloorPlane(normal=np.array([0, 1, 0]), d=0.0), } # voxel_size=0.1 -> max_corr_dist = 0.25m. # Bias is 0.1m, so it should be found. config = ICPConfig(voxel_size=0.1, min_overlap_area=0.0) # Mock overlap to always pass monkeypatch.setattr(icp_reg, "compute_overlap_xz", lambda *a, **k: 10.0) monkeypatch.setattr(icp_reg, "compute_overlap_3d", lambda *a, **k: 10.0) biases = estimate_depth_biases( camera_data, extrinsics, floor_planes, config, reference_serial="cam1" ) assert biases["cam1"] == 0.0 # Relaxed tolerance to 0.02m (2cm) to account for synthetic noise/sampling assert abs(biases["cam2"] - 0.1) < 0.02 def test_estimate_depth_biases_sign_correctness( mock_preprocessing, mock_scene_extraction, monkeypatch ): # Cam1 (Ref) at 2.0. Cam2 at 1.9 (-0.1 bias). import aruco.ground_plane rng = np.random.default_rng(42) base_points = rng.uniform(-1, 1, (400, 3)) base_points[:, 2] = 2.0 def mock_unproject(depth, K, stride=1, **kwargs): d = depth[0, 0] if abs(d - 2.0) < 1e-3: return base_points elif abs(d - 1.9) < 1e-3: norms = np.linalg.norm(base_points, axis=1, keepdims=True) rays = base_points / norms return base_points - rays * 0.1 return base_points monkeypatch.setattr(aruco.ground_plane, "unproject_depth_to_points", mock_unproject) camera_data = { "cam1": create_camera_data(2.0), "cam2": create_camera_data(1.9), } extrinsics = {"cam1": np.eye(4), "cam2": np.eye(4)} floor_planes = { "cam1": FloorPlane(normal=np.array([0, 1, 0]), d=0.0), "cam2": FloorPlane(normal=np.array([0, 1, 0]), d=0.0), } config = ICPConfig(voxel_size=0.1) monkeypatch.setattr(icp_reg, "compute_overlap_xz", lambda *a, **k: 10.0) monkeypatch.setattr(icp_reg, "compute_overlap_3d", lambda *a, **k: 10.0) biases = estimate_depth_biases( camera_data, extrinsics, floor_planes, config, reference_serial="cam1" ) assert biases["cam1"] == 0.0 assert abs(biases["cam2"] - (-0.1)) < 0.02 def test_estimate_depth_biases_four_cameras_chain( mock_preprocessing, mock_scene_extraction, monkeypatch ): # Chain: C1 -> C2 -> C3 -> C4 # C1=0. C2=0.1. C3=0.2. C4=0.3. import aruco.ground_plane rng = np.random.default_rng(42) base_points = rng.uniform(-1, 1, (400, 3)) base_points[:, 2] = 2.0 def mock_unproject(depth, K, stride=1, **kwargs): d = depth[0, 0] bias = d - 2.0 norms = np.linalg.norm(base_points, axis=1, keepdims=True) rays = base_points / norms return base_points + rays * bias monkeypatch.setattr(aruco.ground_plane, "unproject_depth_to_points", mock_unproject) camera_data = { "cam1": create_camera_data(2.0), "cam2": create_camera_data(2.1), "cam3": create_camera_data(2.2), "cam4": create_camera_data(2.3), } extrinsics = {k: np.eye(4) for k in camera_data} floor_planes = { k: FloorPlane(normal=np.array([0, 1, 0]), d=0.0) for k in camera_data } # Max bias is 0.3m. voxel_size=0.1 -> max_corr=0.25m. # C1-C2 diff is 0.1. C2-C3 diff is 0.1. C3-C4 diff is 0.1. # Pairwise they are within 0.25m. # But C1-C4 is 0.3m. # The solver uses pairwise constraints, so it should work as long as neighbors connect. config = ICPConfig(voxel_size=0.1) monkeypatch.setattr(icp_reg, "compute_overlap_xz", lambda *a, **k: 10.0) monkeypatch.setattr(icp_reg, "compute_overlap_3d", lambda *a, **k: 10.0) biases = estimate_depth_biases( camera_data, extrinsics, floor_planes, config, reference_serial="cam1" ) assert abs(biases["cam1"] - 0.0) < 1e-3 assert abs(biases["cam2"] - 0.1) < 0.02 assert abs(biases["cam3"] - 0.2) < 0.03 # Accumulating error tolerance assert abs(biases["cam4"] - 0.3) < 0.04 def test_estimate_depth_biases_disconnected_fallback( mock_preprocessing, mock_scene_extraction, monkeypatch ): # C1, C2 connected. C3 isolated. # C1=0, C2=0.1. C3=0.5 (but isolated). # C3 should be 0.0. import aruco.ground_plane rng = np.random.default_rng(42) base_points = rng.uniform(-1, 1, (400, 3)) base_points[:, 2] = 2.0 def mock_unproject_disconnected(depth, K, stride=1, **kwargs): d = depth[0, 0] if abs(d - 2.5) < 1e-3: # C3 return base_points + np.array([100, 0, 0]) # Far away bias = d - 2.0 norms = np.linalg.norm(base_points, axis=1, keepdims=True) rays = base_points / norms return base_points + rays * bias monkeypatch.setattr( aruco.ground_plane, "unproject_depth_to_points", mock_unproject_disconnected ) # Restore real overlap logic to detect disconnection monkeypatch.setattr(icp_reg, "compute_overlap_xz", icp_reg.compute_overlap_xz) monkeypatch.setattr( icp_reg, "compute_overlap_3d", lambda pa, pb, m: icp_reg.compute_overlap_xz(pa, pb, m), ) camera_data = { "cam1": create_camera_data(2.0), "cam2": create_camera_data(2.1), "cam3": create_camera_data(2.5), } extrinsics = {k: np.eye(4) for k in camera_data} floor_planes = { k: FloorPlane(normal=np.array([0, 1, 0]), d=0.0) for k in camera_data } config = ICPConfig(voxel_size=0.1) biases = estimate_depth_biases( camera_data, extrinsics, floor_planes, config, reference_serial="cam1" ) assert abs(biases["cam1"] - 0.0) < 1e-3 assert abs(biases["cam2"] - 0.1) < 0.02 assert biases["cam3"] == 0.0 # Fallback def test_estimate_depth_biases_min_correspondence_gate( mock_preprocessing, mock_scene_extraction, monkeypatch ): # Two cameras, overlapping, but very few correspondences found. # Should return 0 bias. import aruco.ground_plane rng = np.random.default_rng(42) base_points = rng.uniform(-1, 1, (400, 3)) base_points[:, 2] = 2.0 def mock_unproject(depth, K, stride=1, **kwargs): d = depth[0, 0] if abs(d - 2.1) < 1e-3: # Return only 10 points return base_points[:10] + 0.1 return base_points monkeypatch.setattr(aruco.ground_plane, "unproject_depth_to_points", mock_unproject) camera_data = { "cam1": create_camera_data(2.0), "cam2": create_camera_data(2.1), } extrinsics = {k: np.eye(4) for k in camera_data} floor_planes = { k: FloorPlane(normal=np.array([0, 1, 0]), d=0.0) for k in camera_data } config = ICPConfig(voxel_size=0.1) monkeypatch.setattr(icp_reg, "compute_overlap_xz", lambda *a, **k: 10.0) biases = estimate_depth_biases( camera_data, extrinsics, floor_planes, config, reference_serial="cam1" ) assert biases["cam2"] == 0.0 def test_refine_with_icp_integrates_bias( mock_preprocessing, mock_scene_extraction, monkeypatch ): # Verify refine_with_icp calls estimate_depth_biases and puts result in metrics. def mock_estimate(*args, **kwargs): return {"cam1": 0.0, "cam2": 0.5} monkeypatch.setattr(icp_reg, "estimate_depth_biases", mock_estimate) # Mock pairwise_icp to avoid actual work monkeypatch.setattr( icp_reg, "pairwise_icp", lambda *a, **k: icp_reg.ICPResult(np.eye(4), 1.0, 0.0, np.eye(6), True), ) monkeypatch.setattr(icp_reg, "optimize_pose_graph", lambda *a: None) camera_data = { "cam1": create_camera_data(2.0), "cam2": create_camera_data(2.0), } extrinsics = {k: np.eye(4) for k in camera_data} floor_planes = { k: FloorPlane(normal=np.array([0, 1, 0]), d=0.0) for k in camera_data } config = ICPConfig(depth_bias=True) _, metrics = refine_with_icp(camera_data, extrinsics, floor_planes, config) assert metrics.depth_biases["cam2"] == 0.5 def test_refine_with_icp_bias_toggle_off( mock_preprocessing, mock_scene_extraction, monkeypatch ): called = False def mock_estimate(*args, **kwargs): nonlocal called called = True return {} monkeypatch.setattr(icp_reg, "estimate_depth_biases", mock_estimate) monkeypatch.setattr( icp_reg, "pairwise_icp", lambda *a, **k: icp_reg.ICPResult(np.eye(4), 1.0, 0.0, np.eye(6), True), ) monkeypatch.setattr(icp_reg, "optimize_pose_graph", lambda *a: None) camera_data = {"cam1": create_camera_data(2.0)} extrinsics = {"cam1": np.eye(4)} floor_planes = {"cam1": FloorPlane(normal=np.array([0, 1, 0]), d=0.0)} config = ICPConfig(depth_bias=False) refine_with_icp(camera_data, extrinsics, floor_planes, config) assert not called def test_non_positive_depth_clamp( mock_preprocessing, mock_scene_extraction, monkeypatch ): # If bias makes depth negative, it should be filtered. import aruco.ground_plane received_depths = [] def mock_unproject(depth, K, stride=1, **kwargs): received_depths.append(depth.copy()) return np.zeros((10, 3)) monkeypatch.setattr(aruco.ground_plane, "unproject_depth_to_points", mock_unproject) # Mock estimate to return large negative bias monkeypatch.setattr( icp_reg, "estimate_depth_biases", lambda *a, **k: {"cam1": -5.0} ) monkeypatch.setattr( icp_reg, "pairwise_icp", lambda *a, **k: icp_reg.ICPResult(np.eye(4), 1.0, 0.0, np.eye(6), True), ) monkeypatch.setattr(icp_reg, "optimize_pose_graph", lambda *a: None) camera_data = {"cam1": create_camera_data(2.0)} # Depth 2.0 extrinsics = {"cam1": np.eye(4)} floor_planes = {"cam1": FloorPlane(normal=np.array([0, 1, 0]), d=0.0)} config = ICPConfig(depth_bias=True) refine_with_icp(camera_data, extrinsics, floor_planes, config) assert len(received_depths) > 0 # The depth map passed to unproject should have NaNs where it was negative assert np.isnan(received_depths[-1]).all() def test_estimate_depth_biases_clipping( mock_preprocessing, mock_scene_extraction, monkeypatch ): # Test that biases are clipped to max_abs_bias. # Cam1 (Ref) at 2.0. Cam2 at 2.5 (0.5m bias). # max_abs_bias = 0.3. Should be clipped to 0.3. import aruco.ground_plane rng = np.random.default_rng(42) base_points = rng.uniform(-1, 1, (400, 3)) base_points[:, 2] = 2.0 def mock_unproject(depth, K, stride=1, **kwargs): d = depth[0, 0] if abs(d - 2.0) < 1e-3: return base_points elif abs(d - 2.5) < 1e-3: norms = np.linalg.norm(base_points, axis=1, keepdims=True) rays = base_points / norms return base_points + rays * 0.5 return base_points monkeypatch.setattr(aruco.ground_plane, "unproject_depth_to_points", mock_unproject) camera_data = { "cam1": create_camera_data(2.0), "cam2": create_camera_data(2.5), } extrinsics = {k: np.eye(4) for k in camera_data} floor_planes = { k: FloorPlane(normal=np.array([0, 1, 0]), d=0.0) for k in camera_data } config = ICPConfig(voxel_size=0.1, max_abs_bias=0.3) monkeypatch.setattr(icp_reg, "compute_overlap_xz", lambda *a, **k: 10.0) monkeypatch.setattr(icp_reg, "compute_overlap_3d", lambda *a, **k: 10.0) biases = estimate_depth_biases( camera_data, extrinsics, floor_planes, config, reference_serial="cam1" ) assert biases["cam1"] == 0.0 assert abs(biases["cam2"] - 0.3) < 1e-3 # Should be exactly clipped def test_estimate_depth_biases_reference_fallback( mock_preprocessing, mock_scene_extraction, monkeypatch ): # Test fallback when reference_serial is invalid. # Should pick first sorted camera (cam1) as reference (bias 0.0). import aruco.ground_plane rng = np.random.default_rng(42) base_points = rng.uniform(-1, 1, (400, 3)) base_points[:, 2] = 2.0 def mock_unproject(depth, K, stride=1, **kwargs): # Both cameras see same thing, so 0 bias relative to each other return base_points monkeypatch.setattr(aruco.ground_plane, "unproject_depth_to_points", mock_unproject) camera_data = { "cam1": create_camera_data(2.0), "cam2": create_camera_data(2.0), } extrinsics = {k: np.eye(4) for k in camera_data} floor_planes = { k: FloorPlane(normal=np.array([0, 1, 0]), d=0.0) for k in camera_data } config = ICPConfig(voxel_size=0.1) monkeypatch.setattr(icp_reg, "compute_overlap_xz", lambda *a, **k: 10.0) monkeypatch.setattr(icp_reg, "compute_overlap_3d", lambda *a, **k: 10.0) # Pass invalid reference biases = estimate_depth_biases( camera_data, extrinsics, floor_planes, config, reference_serial="invalid_cam" ) # cam1 should be reference (0.0) because it's first alphabetically assert biases["cam1"] == 0.0 assert abs(biases["cam2"]) < 0.02