import numpy as np from typing import List, Dict, Tuple, Optional, Any try: from scipy.spatial.transform import Rotation as R HAS_SCIPY = True except ImportError: HAS_SCIPY = False def rotation_angle_deg(Ra: np.ndarray, Rb: np.ndarray) -> float: """ Compute the rotation angle in degrees between two rotation matrices. Args: Ra: 3x3 rotation matrix. Rb: 3x3 rotation matrix. Returns: Angle in degrees. """ R_diff = Ra.T @ Rb cos_theta = (np.trace(R_diff) - 1.0) / 2.0 # Clip to avoid numerical issues cos_theta = np.clip(cos_theta, -1.0, 1.0) return np.degrees(np.arccos(cos_theta)) def matrix_to_quaternion(R_mat: np.ndarray) -> np.ndarray: """ Convert a 3x3 rotation matrix to a quaternion (w, x, y, z). Uses the method from "Unit Quaternions and Rotation Matrices" by Mike Day. """ tr = np.trace(R_mat) if tr > 0: S = np.sqrt(tr + 1.0) * 2 qw = 0.25 * S qx = (R_mat[2, 1] - R_mat[1, 2]) / S qy = (R_mat[0, 2] - R_mat[2, 0]) / S qz = (R_mat[1, 0] - R_mat[0, 1]) / S elif (R_mat[0, 0] > R_mat[1, 1]) and (R_mat[0, 0] > R_mat[2, 2]): S = np.sqrt(1.0 + R_mat[0, 0] - R_mat[1, 1] - R_mat[2, 2]) * 2 qw = (R_mat[2, 1] - R_mat[1, 2]) / S qx = 0.25 * S qy = (R_mat[0, 1] + R_mat[1, 0]) / S qz = (R_mat[0, 2] + R_mat[2, 0]) / S elif R_mat[1, 1] > R_mat[2, 2]: S = np.sqrt(1.0 + R_mat[1, 1] - R_mat[0, 0] - R_mat[2, 2]) * 2 qw = (R_mat[0, 2] - R_mat[2, 0]) / S qx = (R_mat[0, 1] + R_mat[1, 0]) / S qy = 0.25 * S qz = (R_mat[1, 2] + R_mat[2, 1]) / S else: S = np.sqrt(1.0 + R_mat[2, 2] - R_mat[0, 0] - R_mat[1, 1]) * 2 qw = (R_mat[1, 0] - R_mat[0, 1]) / S qx = (R_mat[0, 2] + R_mat[2, 0]) / S qy = (R_mat[1, 2] + R_mat[2, 1]) / S qz = 0.25 * S return np.array([qw, qx, qy, qz]) def quaternion_to_matrix(q: np.ndarray) -> np.ndarray: """ Convert a quaternion (w, x, y, z) to a 3x3 rotation matrix. """ qw, qx, qy, qz = q return np.array( [ [ 1 - 2 * qy**2 - 2 * qz**2, 2 * qx * qy - 2 * qz * qw, 2 * qx * qz + 2 * qy * qw, ], [ 2 * qx * qy + 2 * qz * qw, 1 - 2 * qx**2 - 2 * qz**2, 2 * qy * qz - 2 * qx * qw, ], [ 2 * qx * qz - 2 * qy * qw, 2 * qy * qz + 2 * qx * qw, 1 - 2 * qx**2 - 2 * qy**2, ], ] ) class PoseAccumulator: """ Accumulates 4x4 poses and provides robust averaging methods. """ def __init__(self): self.poses: List[np.ndarray] = [] self.reproj_errors: List[float] = [] self.frame_ids: List[int] = [] def add_pose(self, T: np.ndarray, reproj_error: float, frame_id: int) -> None: """ Add a 4x4 pose to the accumulator. Args: T: 4x4 transformation matrix. reproj_error: Reprojection error associated with the pose. frame_id: ID of the frame where the pose was detected. """ if not isinstance(T, np.ndarray) or T.shape != (4, 4): raise ValueError("T must be a 4x4 numpy array") self.poses.append(T.copy()) self.reproj_errors.append(float(reproj_error)) self.frame_ids.append(int(frame_id)) def filter_by_reproj(self, max_reproj_error: float) -> List[int]: """ Return indices of poses with reprojection error below threshold. """ return [ i for i, err in enumerate(self.reproj_errors) if err <= max_reproj_error ] def ransac_filter( self, rot_thresh_deg: float = 5.0, trans_thresh_m: float = 0.05, max_iters: int = 200, random_seed: int = 0, ) -> List[int]: """ Perform RANSAC to find the largest consensus set of poses. Args: rot_thresh_deg: Rotation threshold in degrees. trans_thresh_m: Translation threshold in meters. max_iters: Maximum number of RANSAC iterations. random_seed: Seed for reproducibility. Returns: Indices of the inlier poses. """ n = len(self.poses) if n == 0: return [] if n == 1: return [0] rng = np.random.default_rng(random_seed) best_inliers = [] for _ in range(max_iters): # Pick a random reference pose ref_idx = rng.integers(0, n) T_ref = self.poses[ref_idx] R_ref = T_ref[:3, :3] t_ref = T_ref[:3, 3] current_inliers = [] for i in range(n): T_i = self.poses[i] R_i = T_i[:3, :3] t_i = T_i[:3, 3] # Check rotation d_rot = rotation_angle_deg(R_ref, R_i) # Check translation d_trans = np.linalg.norm(t_ref - t_i) if d_rot <= rot_thresh_deg and d_trans <= trans_thresh_m: current_inliers.append(i) if len(current_inliers) > len(best_inliers): best_inliers = current_inliers if len(best_inliers) == n: break return best_inliers def compute_robust_mean( self, inlier_indices: Optional[List[int]] = None ) -> Tuple[np.ndarray, Dict[str, Any]]: """ Compute the robust mean of the accumulated poses. Args: inlier_indices: Optional list of indices to use. If None, uses all poses. Returns: T_mean: 4x4 averaged transformation matrix. stats: Dictionary with statistics. """ if inlier_indices is None: inlier_indices = list(range(len(self.poses))) n_total = len(self.poses) n_inliers = len(inlier_indices) if n_inliers == 0: return np.eye(4), { "n_total": n_total, "n_inliers": 0, "median_reproj_error": 0.0, } inlier_poses = [self.poses[i] for i in inlier_indices] inlier_errors = [self.reproj_errors[i] for i in inlier_indices] # Translation: median translations = np.array([T[:3, 3] for T in inlier_poses]) t_mean = np.median(translations, axis=0) # Rotation: mean rotations = [T[:3, :3] for T in inlier_poses] if HAS_SCIPY: r_objs = R.from_matrix(rotations) R_mean = r_objs.mean().as_matrix() else: # Quaternion eigen mean fallback quats = np.array([matrix_to_quaternion(rot) for rot in rotations]) # Form the matrix M = sum(q_i * q_i^T) M = np.zeros((4, 4)) for i in range(n_inliers): q = quats[i] M += np.outer(q, q) # The average quaternion is the eigenvector corresponding to the largest eigenvalue evals, evecs = np.linalg.eigh(M) q_mean = evecs[:, -1] R_mean = quaternion_to_matrix(q_mean) T_mean = np.eye(4) T_mean[:3, :3] = R_mean T_mean[:3, 3] = t_mean stats = { "n_total": n_total, "n_inliers": n_inliers, "median_reproj_error": float(np.median(inlier_errors)) if inlier_errors else 0.0, } return T_mean, stats