import numpy as np import pytest import open3d as o3d from scipy.spatial.transform import Rotation from aruco.icp_registration import ( ICPConfig, ICPResult, ICPMetrics, extract_near_floor_band, extract_scene_points, preprocess_point_cloud, compute_overlap_xz, compute_overlap_3d, apply_gravity_constraint, pairwise_icp, build_pose_graph, refine_with_icp, compute_fpfh_features, global_registration, ) from aruco.ground_plane import FloorPlane def test_extract_near_floor_band_basic(): points = np.array( [[0, -0.1, 0], [0, 0.1, 0], [0, 0.2, 0], [0, 0.4, 0]], dtype=np.float64 ) floor_y = 0.0 band_height = 0.3 floor_normal = np.array([0, 1, 0], dtype=np.float64) result = extract_near_floor_band(points, floor_y, band_height, floor_normal) assert len(result) == 2 assert np.all(result[:, 1] >= 0) assert np.all(result[:, 1] <= 0.3) def test_extract_near_floor_band_empty(): points = np.zeros((0, 3)) result = extract_near_floor_band(points, 0.0, 0.3, np.array([0, 1, 0])) assert len(result) == 0 def test_extract_near_floor_band_all_in(): points = np.random.uniform(0, 0.2, (100, 3)) points[:, 1] = np.random.uniform(0.05, 0.25, 100) result = extract_near_floor_band(points, 0.0, 0.3, np.array([0, 1, 0])) assert len(result) == 100 def test_extract_scene_points_floor_mode_legacy(): points = np.array( [[0, -0.1, 0], [0, 0.1, 0], [0, 0.2, 0], [0, 0.4, 0]], dtype=np.float64 ) floor_y = 0.0 band_height = 0.3 floor_normal = np.array([0, 1, 0], dtype=np.float64) # Should match extract_near_floor_band exactly expected = extract_near_floor_band(points, floor_y, band_height, floor_normal) result = extract_scene_points( points, floor_y, floor_normal, mode="floor", band_height=band_height ) np.testing.assert_array_equal(result, expected) def test_extract_scene_points_full_mode(): points = np.random.rand(100, 3) floor_y = 0.0 floor_normal = np.array([0, 1, 0], dtype=np.float64) result = extract_scene_points(points, floor_y, floor_normal, mode="full") np.testing.assert_array_equal(result, points) def test_extract_scene_points_hybrid_vertical(): # Create floor points + vertical wall points floor_pts = np.random.uniform(-1, 1, (100, 3)) floor_pts[:, 1] = 0.1 # Within band wall_pts = np.random.uniform(-1, 1, (100, 3)) wall_pts[:, 0] = 2.0 # Wall at x=2 # Wall points are tall, outside floor band wall_pts[:, 1] = np.random.uniform(0.5, 2.0, 100) points = np.vstack([floor_pts, wall_pts]) floor_y = 0.0 floor_normal = np.array([0, 1, 0], dtype=np.float64) # Hybrid should capture both result = extract_scene_points( points, floor_y, floor_normal, mode="hybrid", band_height=0.3 ) # Should have more points than just floor band floor_only = extract_near_floor_band(points, floor_y, 0.3, floor_normal) assert len(result) > len(floor_only) # Should include high points (walls) assert np.any(result[:, 1] > 0.3) def test_extract_scene_points_hybrid_fallback(): # Only floor points, no vertical structure points = np.random.uniform(-1, 1, (100, 3)) points[:, 1] = 0.1 floor_y = 0.0 floor_normal = np.array([0, 1, 0], dtype=np.float64) result = extract_scene_points( points, floor_y, floor_normal, mode="hybrid", band_height=0.3 ) # Should fall back to floor points floor_only = extract_near_floor_band(points, floor_y, 0.3, floor_normal) np.testing.assert_array_equal(result, floor_only) def test_preprocess_point_cloud_sor(): # Create a dense cluster + sparse outliers cluster = np.random.normal(0, 0.1, (100, 3)) outliers = np.random.uniform(2, 3, (5, 3)) points = np.vstack([cluster, outliers]) pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(points) cleaned = preprocess_point_cloud(pcd, voxel_size=0.02) # Should remove outliers assert len(cleaned.points) < len(points) assert len(cleaned.points) >= 90 # Keep most inliers def test_compute_overlap_xz_full(): points_a = np.array([[0, 0, 0], [1, 0, 1]]) points_b = np.array([[0, 0, 0], [1, 0, 1]]) area = compute_overlap_xz(points_a, points_b) assert abs(area - 1.0) < 1e-6 def test_compute_overlap_xz_no(): points_a = np.array([[0, 0, 0], [1, 0, 1]]) points_b = np.array([[2, 0, 2], [3, 0, 3]]) area = compute_overlap_xz(points_a, points_b) assert area == 0.0 def test_compute_overlap_xz_partial(): points_a = np.array([[0, 0, 0], [1, 0, 1]]) points_b = np.array([[0.5, 0, 0.5], [1.5, 0, 1.5]]) area = compute_overlap_xz(points_a, points_b) assert abs(area - 0.25) < 1e-6 def test_compute_overlap_xz_with_margin(): points_a = np.array([[0, 0, 0], [1, 0, 1]]) points_b = np.array([[1.2, 0, 0], [2.2, 0, 1]]) area_no_margin = compute_overlap_xz(points_a, points_b) area_with_margin = compute_overlap_xz(points_a, points_b, margin=0.5) assert area_no_margin == 0.0 assert area_with_margin > 0.0 def test_compute_overlap_3d_full(): points_a = np.array([[0, 0, 0], [1, 1, 1]]) points_b = np.array([[0, 0, 0], [1, 1, 1]]) volume = compute_overlap_3d(points_a, points_b) assert abs(volume - 1.0) < 1e-6 def test_compute_overlap_3d_no(): points_a = np.array([[0, 0, 0], [1, 1, 1]]) points_b = np.array([[2, 2, 2], [3, 3, 3]]) volume = compute_overlap_3d(points_a, points_b) assert volume == 0.0 def test_compute_overlap_3d_partial(): # Overlap in [0.5, 1.0] for all axes -> 0.5^3 = 0.125 points_a = np.array([[0, 0, 0], [1, 1, 1]]) points_b = np.array([[0.5, 0.5, 0.5], [1.5, 1.5, 1.5]]) volume = compute_overlap_3d(points_a, points_b) assert abs(volume - 0.125) < 1e-6 def test_compute_overlap_3d_empty(): points_a = np.zeros((0, 3)) points_b = np.array([[0, 0, 0], [1, 1, 1]]) assert compute_overlap_3d(points_a, points_b) == 0.0 assert compute_overlap_3d(points_b, points_a) == 0.0 def test_apply_gravity_constraint_identity(): T = np.eye(4) result = apply_gravity_constraint(T, T) np.testing.assert_allclose(result, T, atol=1e-6) def test_apply_gravity_constraint_preserves_yaw(): T_orig = np.eye(4) R_icp = Rotation.from_euler("xyz", [0, 10, 0], degrees=True).as_matrix() T_icp = np.eye(4) T_icp[:3, :3] = R_icp result = apply_gravity_constraint(T_icp, T_orig, penalty_weight=10.0) res_euler = Rotation.from_matrix(result[:3, :3]).as_euler("xyz", degrees=True) assert abs(res_euler[1] - 10.0) < 1e-6 assert abs(res_euler[0]) < 1e-6 assert abs(res_euler[2]) < 1e-6 def test_apply_gravity_constraint_regularizes_pitch_roll(): T_orig = np.eye(4) R_icp = Rotation.from_euler("xyz", [10, 0, 10], degrees=True).as_matrix() T_icp = np.eye(4) T_icp[:3, :3] = R_icp result = apply_gravity_constraint(T_icp, T_orig, penalty_weight=9.0) res_euler = Rotation.from_matrix(result[:3, :3]).as_euler("xyz", degrees=True) assert abs(res_euler[0] - 1.0) < 1e-2 assert abs(res_euler[2] - 1.0) < 1e-2 def create_box_pcd(size=1.0, num_points=500, seed=42): rng = np.random.default_rng(seed) points = rng.uniform(0, size, (num_points, 3)) pcd = o3d.geometry.PointCloud() pcd.points = o3d.utility.Vector3dVector(points) return pcd def test_pairwise_icp_identity(): pcd = create_box_pcd() config = ICPConfig(min_fitness=0.1) result = pairwise_icp(pcd, pcd, config, np.eye(4)) assert result.converged assert result.fitness > 0.9 np.testing.assert_allclose(result.transformation, np.eye(4), atol=1e-3) def test_pairwise_icp_known_transform(): source = create_box_pcd() T_true = np.eye(4) T_true[:3, :3] = Rotation.from_euler("y", 5, degrees=True).as_matrix() T_true[:3, 3] = [0.05, 0, 0.02] target = o3d.geometry.PointCloud() target.points = o3d.utility.Vector3dVector( (np.asarray(source.points) @ T_true[:3, :3].T) + T_true[:3, 3] ) config = ICPConfig(min_fitness=0.5, voxel_size=0.01) result = pairwise_icp(source, target, config, np.eye(4)) assert result.converged np.testing.assert_allclose(result.transformation, T_true, atol=1e-2) def test_pairwise_icp_known_transform_gicp(): source = create_box_pcd() T_true = np.eye(4) T_true[:3, :3] = Rotation.from_euler("y", 5, degrees=True).as_matrix() T_true[:3, 3] = [0.05, 0, 0.02] target = o3d.geometry.PointCloud() target.points = o3d.utility.Vector3dVector( (np.asarray(source.points) @ T_true[:3, :3].T) + T_true[:3, 3] ) # GICP usually needs normals, which pairwise_icp estimates internally config = ICPConfig(min_fitness=0.5, voxel_size=0.01, method="gicp") result = pairwise_icp(source, target, config, np.eye(4)) assert result.converged np.testing.assert_allclose(result.transformation, T_true, atol=1e-2) def test_pairwise_icp_insufficient_points(): source = o3d.geometry.PointCloud() source.points = o3d.utility.Vector3dVector(np.random.rand(5, 3)) target = o3d.geometry.PointCloud() target.points = o3d.utility.Vector3dVector(np.random.rand(5, 3)) config = ICPConfig(min_fitness=0.9) result = pairwise_icp(source, target, config, np.eye(4)) assert not result.converged def test_robust_kernel_tukey(): source = create_box_pcd() T_true = np.eye(4) T_true[:3, 3] = [0.05, 0, 0.02] target = o3d.geometry.PointCloud() target.points = o3d.utility.Vector3dVector( (np.asarray(source.points) @ T_true[:3, :3].T) + T_true[:3, 3] ) # Add some outliers to test robustness outliers = np.random.uniform(2, 3, (10, 3)) target_points = np.vstack([np.asarray(target.points), outliers]) target.points = o3d.utility.Vector3dVector(target_points) config = ICPConfig( min_fitness=0.5, voxel_size=0.01, robust_kernel="tukey", robust_kernel_k=0.1, ) result = pairwise_icp(source, target, config, np.eye(4)) assert result.converged # Should still converge close to T_true despite outliers np.testing.assert_allclose(result.transformation, T_true, atol=2e-2) def test_robust_kernel_tukey_gicp(): source = create_box_pcd() T_true = np.eye(4) T_true[:3, 3] = [0.05, 0, 0.02] target = o3d.geometry.PointCloud() target.points = o3d.utility.Vector3dVector( (np.asarray(source.points) @ T_true[:3, :3].T) + T_true[:3, 3] ) # Add some outliers outliers = np.random.uniform(2, 3, (10, 3)) target_points = np.vstack([np.asarray(target.points), outliers]) target.points = o3d.utility.Vector3dVector(target_points) config = ICPConfig( min_fitness=0.5, voxel_size=0.01, method="gicp", robust_kernel="tukey", robust_kernel_k=0.1, ) result = pairwise_icp(source, target, config, np.eye(4)) assert result.converged np.testing.assert_allclose(result.transformation, T_true, atol=2e-2) def test_build_pose_graph_basic(): serials = ["cam1", "cam2"] extrinsics = {"cam1": np.eye(4), "cam2": np.eye(4)} res = ICPResult( transformation=np.eye(4), fitness=1.0, inlier_rmse=0.0, information_matrix=np.eye(6), converged=True, ) pair_results = {("cam1", "cam2"): res} graph = build_pose_graph(serials, extrinsics, pair_results, "cam1") assert len(graph.nodes) == 2 assert len(graph.edges) == 1 def test_build_pose_graph_disconnected(): serials = ["cam1", "cam2", "cam3"] extrinsics = {"cam1": np.eye(4), "cam2": np.eye(4), "cam3": np.eye(4)} res = ICPResult( transformation=np.eye(4), fitness=1.0, inlier_rmse=0.0, information_matrix=np.eye(6), converged=True, ) pair_results = {("cam1", "cam2"): res} graph = build_pose_graph(serials, extrinsics, pair_results, "cam1") assert len(graph.nodes) == 2 assert len(graph.edges) == 1 def test_refine_with_icp_synthetic_offset(): import aruco.icp_registration import aruco.ground_plane box_points = create_box_pcd(size=0.5).points box_points = np.asarray(box_points) box_points[:, 1] -= 1.0 box_points[:, 2] += 2.0 def mock_unproject(depth, K, stride=1, **kwargs): if depth[0, 0] == 1.0: return box_points else: return box_points - np.array([1.0, 0, 0]) orig_unproject = aruco.ground_plane.unproject_depth_to_points aruco.ground_plane.unproject_depth_to_points = mock_unproject try: camera_data = { "cam1": {"depth": np.ones((10, 10)), "K": np.eye(3)}, "cam2": {"depth": np.zeros((10, 10)), "K": np.eye(3)}, } T_w1 = np.eye(4) T_w2_est = np.eye(4) T_w2_est[0, 3] = 1.05 extrinsics = {"cam1": T_w1, "cam2": T_w2_est} floor_planes = { "cam1": FloorPlane(normal=np.array([0, 1, 0]), d=1.0), "cam2": FloorPlane(normal=np.array([0, 1, 0]), d=1.0), } config = ICPConfig( min_overlap_area=0.01, min_fitness=0.1, voxel_size=0.05, max_iterations=[20, 10, 5], max_translation_m=3.0, ) new_extrinsics, metrics = refine_with_icp( camera_data, extrinsics, floor_planes, config ) assert metrics.success assert metrics.num_cameras_optimized == 2 assert abs(new_extrinsics["cam2"][0, 3] - T_w2_est[0, 3]) > 0.01 finally: aruco.ground_plane.unproject_depth_to_points = orig_unproject def test_refine_with_icp_no_overlap(): import aruco.icp_registration import aruco.ground_plane def mock_unproject(depth, K, stride=1, **kwargs): if depth[0, 0] == 1.0: return np.random.rand(200, 3) + [0, -1, 0] else: return np.random.rand(200, 3) + [10, -1, 0] orig_unproject = aruco.ground_plane.unproject_depth_to_points aruco.ground_plane.unproject_depth_to_points = mock_unproject try: camera_data = { "cam1": {"depth": np.ones((10, 10)), "K": np.eye(3)}, "cam2": {"depth": np.zeros((10, 10)), "K": np.eye(3)}, } extrinsics = {"cam1": np.eye(4), "cam2": np.eye(4)} floor_planes = { "cam1": FloorPlane(normal=np.array([0, 1, 0]), d=1.0), "cam2": FloorPlane(normal=np.array([0, 1, 0]), d=1.0), } config = ICPConfig(min_overlap_area=1.0) new_extrinsics, metrics = refine_with_icp( camera_data, extrinsics, floor_planes, config ) assert metrics.num_cameras_optimized == 1 assert metrics.success finally: aruco.ground_plane.unproject_depth_to_points = orig_unproject def test_refine_with_icp_single_camera(): camera_data = {"cam1": {"depth": np.ones((10, 10)), "K": np.eye(3)}} extrinsics = {"cam1": np.eye(4)} floor_planes = {"cam1": FloorPlane(normal=np.array([0, 1, 0]), d=1.0)} config = ICPConfig() new_extrinsics, metrics = refine_with_icp( camera_data, extrinsics, floor_planes, config ) assert metrics.num_cameras_optimized == 1 assert metrics.success def test_compute_fpfh_features(): pcd = create_box_pcd() voxel_size = 0.05 pcd_down = pcd.voxel_down_sample(voxel_size) fpfh = compute_fpfh_features(pcd_down, voxel_size) assert fpfh.dimension() == 33 assert fpfh.num() == len(pcd_down.points) def test_global_registration_known_transform(): source = create_box_pcd(size=1.0, num_points=1000) # Create target with significant transform (rotation + translation) T_true = np.eye(4) # 30 degree rotation around Y T_true[:3, :3] = Rotation.from_euler("y", 30, degrees=True).as_matrix() T_true[:3, 3] = [0.5, 0.0, 0.2] target = o3d.geometry.PointCloud() target.points = o3d.utility.Vector3dVector( (np.asarray(source.points) @ T_true[:3, :3].T) + T_true[:3, 3] ) voxel_size = 0.05 source_down = source.voxel_down_sample(voxel_size) target_down = target.voxel_down_sample(voxel_size) source_fpfh = compute_fpfh_features(source_down, voxel_size) target_fpfh = compute_fpfh_features(target_down, voxel_size) result = global_registration( source_down, target_down, source_fpfh, target_fpfh, voxel_size ) assert result.fitness > 0.1 # RANSAC is stochastic, but with clean data it should be reasonably close # We check if it found a transform close to truth T_est = result.transformation # Check rotation difference R_diff = T_est[:3, :3] @ T_true[:3, :3].T rot_diff = Rotation.from_matrix(R_diff).as_euler("xyz", degrees=True) assert np.linalg.norm(rot_diff) < 5.0 # Within 5 degrees # Check translation difference trans_diff = np.linalg.norm(T_est[:3, 3] - T_true[:3, 3]) assert trans_diff < 0.1 # Within 10cm def test_refine_with_icp_global_init_success(): import aruco.icp_registration import aruco.ground_plane # Create two overlapping point clouds with large offset box_points = create_box_pcd(size=1.0, num_points=2000).points box_points = np.asarray(box_points) # Mock unproject to return these points def mock_unproject(depth, K, stride=1, **kwargs): if depth[0, 0] == 1.0: # cam1 return box_points else: # cam2 # Apply large transform to simulate misaligned camera # Rotate 90 deg and translate R = Rotation.from_euler("y", 90, degrees=True).as_matrix() return (box_points @ R.T) + [2.0, 0, 0] orig_unproject = aruco.ground_plane.unproject_depth_to_points aruco.ground_plane.unproject_depth_to_points = mock_unproject try: camera_data = { "cam1": {"depth": np.ones((10, 10)), "K": np.eye(3)}, "cam2": {"depth": np.zeros((10, 10)), "K": np.eye(3)}, } # Initial extrinsics are identity (very wrong for cam2) extrinsics = {"cam1": np.eye(4), "cam2": np.eye(4)} floor_planes = { "cam1": FloorPlane(normal=np.array([0, 1, 0]), d=0.0), "cam2": FloorPlane(normal=np.array([0, 1, 0]), d=0.0), } # Enable global init config = ICPConfig( global_init=True, min_overlap_area=0.0, # Disable overlap check for this test since initial overlap is zero min_fitness=0.1, voxel_size=0.05, max_rotation_deg=180.0, # Allow large rotation for this test max_translation_m=5.0, # Allow large translation ) new_extrinsics, metrics = refine_with_icp( camera_data, extrinsics, floor_planes, config ) assert metrics.success assert metrics.num_cameras_optimized == 2 # Check if cam2 moved significantly from identity T_cam2 = new_extrinsics["cam2"] assert np.linalg.norm(T_cam2[:3, 3]) > 1.0 finally: aruco.ground_plane.unproject_depth_to_points = orig_unproject def test_refine_with_icp_global_init_disabled(): """Verify that global registration is skipped when disabled.""" import aruco.icp_registration import aruco.ground_plane # Mock global_registration to raise error if called orig_global_reg = aruco.icp_registration.global_registration def mock_global_reg(*args, **kwargs): raise RuntimeError("Global registration should not be called") aruco.icp_registration.global_registration = mock_global_reg # Mock unproject box_points = create_box_pcd(size=0.5).points box_points = np.asarray(box_points) def mock_unproject(depth, K, stride=1, **kwargs): if depth[0, 0] == 1.0: return box_points else: return box_points # Perfect overlap orig_unproject = aruco.ground_plane.unproject_depth_to_points aruco.ground_plane.unproject_depth_to_points = mock_unproject try: camera_data = { "cam1": {"depth": np.ones((10, 10)), "K": np.eye(3)}, "cam2": {"depth": np.zeros((10, 10)), "K": np.eye(3)}, } extrinsics = {"cam1": np.eye(4), "cam2": np.eye(4)} floor_planes = { "cam1": FloorPlane(normal=np.array([0, 1, 0]), d=0.0), "cam2": FloorPlane(normal=np.array([0, 1, 0]), d=0.0), } # Default config (global_init=False) config = ICPConfig(min_overlap_area=0.01) # Should not raise RuntimeError refine_with_icp(camera_data, extrinsics, floor_planes, config) finally: aruco.ground_plane.unproject_depth_to_points = orig_unproject aruco.icp_registration.global_registration = orig_global_reg def test_refine_with_icp_global_init_fallback_low_fitness(): """Verify fallback to original init when global registration has low fitness.""" import aruco.icp_registration import aruco.ground_plane # Mock global_registration to return low fitness orig_global_reg = aruco.icp_registration.global_registration class MockResult: def __init__(self): self.fitness = 0.05 # Below 0.1 threshold self.transformation = np.eye(4) self.transformation[0, 3] = 100.0 # Crazy transform def mock_global_reg(*args, **kwargs): return MockResult() aruco.icp_registration.global_registration = mock_global_reg # Mock unproject box_points = create_box_pcd(size=0.5).points box_points = np.asarray(box_points) def mock_unproject(depth, K, stride=1, **kwargs): if depth[0, 0] == 1.0: return box_points else: return box_points # Perfect overlap orig_unproject = aruco.ground_plane.unproject_depth_to_points aruco.ground_plane.unproject_depth_to_points = mock_unproject try: camera_data = { "cam1": {"depth": np.ones((10, 10)), "K": np.eye(3)}, "cam2": {"depth": np.zeros((10, 10)), "K": np.eye(3)}, } extrinsics = {"cam1": np.eye(4), "cam2": np.eye(4)} floor_planes = { "cam1": FloorPlane(normal=np.array([0, 1, 0]), d=0.0), "cam2": FloorPlane(normal=np.array([0, 1, 0]), d=0.0), } config = ICPConfig(global_init=True, min_overlap_area=0.01) new_extrinsics, metrics = refine_with_icp( camera_data, extrinsics, floor_planes, config ) # Should have ignored the crazy transform from global reg assert np.allclose(new_extrinsics["cam2"], np.eye(4), atol=0.1) finally: aruco.ground_plane.unproject_depth_to_points = orig_unproject aruco.icp_registration.global_registration = orig_global_reg def test_refine_with_icp_global_init_fallback_bounds_check(): """Verify fallback when global registration exceeds safety bounds.""" import aruco.icp_registration import aruco.ground_plane # Mock global_registration to return unsafe transform orig_global_reg = aruco.icp_registration.global_registration class MockResult: def __init__(self): self.fitness = 1.0 # High fitness self.transformation = np.eye(4) self.transformation[0, 3] = 10.0 # 10m translation (unsafe) def mock_global_reg(*args, **kwargs): return MockResult() aruco.icp_registration.global_registration = mock_global_reg # Mock unproject box_points = create_box_pcd(size=0.5).points box_points = np.asarray(box_points) def mock_unproject(depth, K, stride=1, **kwargs): if depth[0, 0] == 1.0: return box_points else: return box_points # Perfect overlap orig_unproject = aruco.ground_plane.unproject_depth_to_points aruco.ground_plane.unproject_depth_to_points = mock_unproject try: camera_data = { "cam1": {"depth": np.ones((10, 10)), "K": np.eye(3)}, "cam2": {"depth": np.zeros((10, 10)), "K": np.eye(3)}, } extrinsics = {"cam1": np.eye(4), "cam2": np.eye(4)} floor_planes = { "cam1": FloorPlane(normal=np.array([0, 1, 0]), d=0.0), "cam2": FloorPlane(normal=np.array([0, 1, 0]), d=0.0), } config = ICPConfig( global_init=True, min_overlap_area=0.01, max_translation_m=0.5, # Strict bound ) new_extrinsics, metrics = refine_with_icp( camera_data, extrinsics, floor_planes, config ) # Should have ignored the unsafe transform assert np.allclose(new_extrinsics["cam2"], np.eye(4), atol=0.1) finally: aruco.ground_plane.unproject_depth_to_points = orig_unproject aruco.icp_registration.global_registration = orig_global_reg