import pytest import numpy as np from unittest.mock import MagicMock, patch import sys from pathlib import Path # Add py_workspace to path so we can import calibrate_extrinsics sys.path.append(str(Path(__file__).parent.parent)) # We will import the function after we create it, or we can import the module and patch it # For now, let's assume we will add the function to calibrate_extrinsics.py # Since the file exists but the function doesn't, we can't import it yet. # But for TDD, I will write the test assuming the function exists in the module. # I'll use a dynamic import or just import the module and access the function dynamically if needed, # but standard import is better. I'll write the test file, but I won't run it until I refactor the code. from calibrate_extrinsics import ( apply_depth_verify_refine_postprocess, run_benchmark_matrix, ) @pytest.fixture def mock_dependencies(): with ( patch("calibrate_extrinsics.verify_extrinsics_with_depth") as mock_verify, patch("calibrate_extrinsics.refine_extrinsics_with_depth") as mock_refine, patch("calibrate_extrinsics.click.echo") as mock_echo, ): # Setup mock return values mock_verify_res = MagicMock() mock_verify_res.rmse = 0.05 mock_verify_res.mean_abs = 0.04 mock_verify_res.median = 0.03 mock_verify_res.depth_normalized_rmse = 0.02 mock_verify_res.n_valid = 100 mock_verify_res.n_total = 120 mock_verify_res.residuals = [(1, 0, 0.01), (1, 1, 0.02)] mock_verify.return_value = mock_verify_res mock_refine_res_stats = { "delta_rotation_deg": 1.0, "delta_translation_norm_m": 0.1, "success": True, "nfev": 10, "termination_message": "Success", } # refine returns (new_pose_matrix, stats) mock_refine.return_value = (np.eye(4), mock_refine_res_stats) yield mock_verify, mock_refine, mock_echo def test_benchmark_matrix(mock_dependencies): mock_verify, mock_refine, _ = mock_dependencies serial = "123456" serial_int = int(serial) results = {serial: {"pose": "1 0 0 0 0 1 0 0 0 0 1 0 0 0 0 1"}} frame_mock = MagicMock( depth_map=np.zeros((10, 10)), confidence_map=np.zeros((10, 10)) ) vf = { "frame": frame_mock, "ids": np.array([[1]]), "frame_index": 100, } verification_frames = {serial_int: [vf]} first_frames = {serial_int: vf} marker_geometry = {1: np.zeros((4, 3))} camera_matrices = {serial_int: np.eye(3)} bench_results = run_benchmark_matrix( results, verification_frames, first_frames, marker_geometry, camera_matrices, depth_confidence_threshold=50, ) assert serial in bench_results assert "baseline" in bench_results[serial] assert "robust" in bench_results[serial] assert "robust+confidence" in bench_results[serial] assert "robust+confidence+best-frame" in bench_results[serial] # 4 configs * (1 verify_pre + 1 refine + 1 verify_post) = 12 calls to verify, 4 to refine assert ( mock_verify.call_count == 8 ) # Wait, verify_pre and verify_post are called for each config. # Actually, 4 configs * 2 verify calls = 8. assert mock_refine.call_count == 4 def test_verify_only(mock_dependencies, tmp_path): mock_verify, mock_refine, _ = mock_dependencies # Setup inputs serial = "123456" serial_int = int(serial) results = { serial: { "pose": "1 0 0 0 0 1 0 0 0 0 1 0 0 0 0 1", # Identity matrix flattened "stats": {}, } } verification_frames = { serial_int: [ { "frame": MagicMock( depth_map=np.zeros((10, 10)), confidence_map=np.zeros((10, 10)) ), "ids": np.array([[1]]), "corners": np.zeros((1, 4, 2)), } ] } marker_geometry = {1: np.zeros((4, 3))} camera_matrices = {serial_int: np.eye(3)} updated_results, csv_rows = apply_depth_verify_refine_postprocess( results=results, verification_frames=verification_frames, marker_geometry=marker_geometry, camera_matrices=camera_matrices, verify_depth=True, refine_depth=False, use_confidence_weights=False, depth_confidence_threshold=50, report_csv_path=None, ) assert "depth_verify" in updated_results[serial] assert updated_results[serial]["depth_verify"]["rmse"] == 0.05 assert "refine_depth" not in updated_results[serial] assert ( len(csv_rows) == 0 ) # No CSV path provided, so no rows returned for writing (or empty list) mock_verify.assert_called_once() mock_refine.assert_not_called() def test_refine_depth(mock_dependencies): mock_verify, mock_refine, _ = mock_dependencies # Setup inputs serial = "123456" serial_int = int(serial) results = {serial: {"pose": "1 0 0 0 0 1 0 0 0 0 1 0 0 0 0 1", "stats": {}}} verification_frames = { serial_int: [ { "frame": MagicMock( depth_map=np.zeros((10, 10)), confidence_map=np.zeros((10, 10)) ), "ids": np.array([[1]]), "corners": np.zeros((1, 4, 2)), } ] } marker_geometry = {1: np.zeros((4, 3))} camera_matrices = {serial_int: np.eye(3)} # Mock verify to return different values for pre and post # First call (pre-refine) res_pre = MagicMock() res_pre.rmse = 0.1 res_pre.n_valid = 100 res_pre.residuals = [] # Second call (post-refine) res_post = MagicMock() res_post.rmse = 0.05 res_post.n_valid = 100 res_post.residuals = [] mock_verify.side_effect = [res_pre, res_post] updated_results, _ = apply_depth_verify_refine_postprocess( results=results, verification_frames=verification_frames, marker_geometry=marker_geometry, camera_matrices=camera_matrices, verify_depth=False, # refine implies verify usually, but let's check logic refine_depth=True, use_confidence_weights=False, depth_confidence_threshold=50, ) assert "refine_depth" in updated_results[serial] assert "depth_verify_post" in updated_results[serial] assert ( updated_results[serial]["refine_depth"]["improvement_rmse"] == 0.05 ) # 0.1 - 0.05 assert mock_verify.call_count == 2 mock_refine.assert_called_once() def test_refine_depth_warning_negligible_improvement(mock_dependencies): mock_verify, mock_refine, mock_echo = mock_dependencies serial = "123456" serial_int = int(serial) results = {serial: {"pose": "1 0 0 0 0 1 0 0 0 0 1 0 0 0 0 1", "stats": {}}} verification_frames = { serial_int: [ { "frame": MagicMock(depth_map=np.zeros((10, 10))), "ids": np.array([[1]]), } ] } marker_geometry = {1: np.zeros((4, 3))} camera_matrices = {serial_int: np.eye(3)} # RMSE stays almost same res_pre = MagicMock(rmse=0.1, n_valid=10, residuals=[]) res_post = MagicMock(rmse=0.099999, n_valid=10, residuals=[]) mock_verify.side_effect = [res_pre, res_post] # nfev > 5 mock_refine.return_value = ( np.eye(4), { "delta_rotation_deg": 0.0, "delta_translation_norm_m": 0.0, "success": True, "nfev": 10, "termination_message": "Converged", }, ) apply_depth_verify_refine_postprocess( results=results, verification_frames=verification_frames, marker_geometry=marker_geometry, camera_matrices=camera_matrices, verify_depth=False, refine_depth=True, use_confidence_weights=False, depth_confidence_threshold=50, ) # Check if warning was echoed # "WARNING: Optimization ran for 10 steps but improvement was negligible" any_negligible = any( "negligible" in str(call.args[0]) for call in mock_echo.call_args_list ) assert any_negligible def test_refine_depth_warning_failed_or_stalled(mock_dependencies): mock_verify, mock_refine, mock_echo = mock_dependencies serial = "123456" serial_int = int(serial) results = {serial: {"pose": "1 0 0 0 0 1 0 0 0 0 1 0 0 0 0 1", "stats": {}}} verification_frames = { serial_int: [ { "frame": MagicMock(depth_map=np.zeros((10, 10))), "ids": np.array([[1]]), } ] } marker_geometry = {1: np.zeros((4, 3))} camera_matrices = {serial_int: np.eye(3)} res_pre = MagicMock(rmse=0.1, n_valid=10, residuals=[]) res_post = MagicMock(rmse=0.1, n_valid=10, residuals=[]) mock_verify.side_effect = [res_pre, res_post] # success=False mock_refine.return_value = ( np.eye(4), { "delta_rotation_deg": 0.0, "delta_translation_norm_m": 0.0, "success": False, "nfev": 1, "termination_message": "Failed", }, ) apply_depth_verify_refine_postprocess( results=results, verification_frames=verification_frames, marker_geometry=marker_geometry, camera_matrices=camera_matrices, verify_depth=False, refine_depth=True, use_confidence_weights=False, depth_confidence_threshold=50, ) any_failed = any( "failed or stalled" in str(call.args[0]) for call in mock_echo.call_args_list ) assert any_failed def test_csv_output(mock_dependencies, tmp_path): mock_verify, _, _ = mock_dependencies csv_path = tmp_path / "report.csv" serial = "123456" serial_int = int(serial) results = {serial: {"pose": "1 0 0 0 0 1 0 0 0 0 1 0 0 0 0 1", "stats": {}}} verification_frames = { serial_int: [ { "frame": MagicMock( depth_map=np.zeros((10, 10)), confidence_map=np.zeros((10, 10)) ), "ids": np.array([[1]]), "corners": np.zeros((1, 4, 2)), } ] } marker_geometry = {1: np.zeros((4, 3))} camera_matrices = {serial_int: np.eye(3)} updated_results, csv_rows = apply_depth_verify_refine_postprocess( results=results, verification_frames=verification_frames, marker_geometry=marker_geometry, camera_matrices=camera_matrices, verify_depth=True, refine_depth=False, use_confidence_weights=False, depth_confidence_threshold=50, report_csv_path=str(csv_path), ) assert len(csv_rows) == 2 # From mock_verify_res.residuals assert csv_rows[0] == [serial_int, 1, 0, 0.01] # Verify file content assert csv_path.exists() content = csv_path.read_text().splitlines() assert len(content) == 3 # Header + 2 rows assert content[0] == "serial,marker_id,corner_idx,residual" assert content[1] == f"{serial_int},1,0,0.01"