diff --git a/src/pose_tracking_exp/schema/__init__.py b/src/pose_tracking_exp/schema/__init__.py index 9aceee3..05dd040 100644 --- a/src/pose_tracking_exp/schema/__init__.py +++ b/src/pose_tracking_exp/schema/__init__.py @@ -18,7 +18,9 @@ from pose_tracking_exp.schema.tracking import ( ProposalCluster, SkeletonState, TentativeTrackState, + TRACK_COVARIANCE_DIMENSION, TrackState, + TrackUpdateEvent, TrackerConfig, TrackerDiagnostics, TrackedFrameResult, @@ -41,7 +43,9 @@ __all__ = [ "SceneConfig", "SkeletonState", "TentativeTrackState", + "TRACK_COVARIANCE_DIMENSION", "TrackState", + "TrackUpdateEvent", "TrackerConfig", "TrackerDiagnostics", "TrackedFrameResult", diff --git a/src/pose_tracking_exp/schema/tracking.py b/src/pose_tracking_exp/schema/tracking.py index 91d1f31..641b352 100644 --- a/src/pose_tracking_exp/schema/tracking.py +++ b/src/pose_tracking_exp/schema/tracking.py @@ -5,6 +5,8 @@ import numpy as np from pose_tracking_exp.common.tensor_types import Pose3D, Vector3 +TRACK_COVARIANCE_DIMENSION = 70 # 31 pose parameters + 31 velocities + 8 shape parameters. + @dataclass(slots=True) class ProposalCluster: @@ -13,6 +15,29 @@ class ProposalCluster: source_views: frozenset[str] support_size: int mean_score: float + root_centered_pose3d: Pose3D = field(default_factory=lambda: np.zeros((20, 4), dtype=np.float64)) + view_count: int = 0 + pair_count: int = 0 + mean_reprojection_error: float = 0.0 + support_detection_indices: dict[str, tuple[int, ...]] = field(default_factory=dict) + + def __post_init__(self) -> None: + self.pose3d = np.asarray(self.pose3d, dtype=np.float64).reshape(20, 4) + self.root = np.asarray(self.root, dtype=np.float64).reshape(3) + root_centered = np.asarray(self.root_centered_pose3d, dtype=np.float64) + if root_centered.shape != (20, 4) or not np.any(root_centered[:, 3] > 0.0): + root_centered = self.pose3d.copy() + root_centered[:, :3] -= self.root[None, :] + self.root_centered_pose3d = root_centered + self.source_views = frozenset(self.source_views) + self.support_detection_indices = { + str(camera_name): tuple(int(index) for index in indices) + for camera_name, indices in self.support_detection_indices.items() + } + if self.view_count <= 0: + self.view_count = max(len(self.source_views), min(2, self.support_size)) + if self.pair_count <= 0: + self.pair_count = self.support_size @dataclass(slots=True) @@ -28,11 +53,17 @@ class TentativeTrackState: state: Literal["tentative"] = "tentative" age: int = 0 misses: int = 0 + hit_count: int = 0 score: float = 0.0 last_bundle_index: int = -1 root: Vector3 = field(default_factory=lambda: np.zeros(3, dtype=np.float64)) pose3d: Pose3D = field(default_factory=lambda: np.zeros((20, 4), dtype=np.float64)) - evidence_buffer: list[Pose3D] = field(default_factory=list) + root_centered_pose3d: Pose3D = field(default_factory=lambda: np.zeros((20, 4), dtype=np.float64)) + evidence_buffer: list[ProposalCluster] = field(default_factory=list) + mean_view_count: float = 0.0 + mean_reprojection_error: float = 0.0 + mean_support_size: float = 0.0 + last_source_views: frozenset[str] = field(default_factory=frozenset) @dataclass(slots=True) @@ -53,11 +84,52 @@ class ActiveTrackState: noise_scale: np.ndarray = field( default_factory=lambda: np.full((20,), 9.0, dtype=np.float64) ) + noise_by_view: dict[str, np.ndarray] = field(default_factory=dict) + root_velocity: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=np.float64)) + joint_velocity: np.ndarray = field(default_factory=lambda: np.zeros(31, dtype=np.float64)) + beta_frozen: bool = False + beta_grace_age: int = 0 + covariance: np.ndarray = field(default_factory=lambda: np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64)) + outside_volume_age: int = 0 + mean_reprojection_error: float = np.inf + last_update_kind: Literal[ + "initialized", + "direct_update", + "predict_only", + "direct_reacquire", + "proposal_reacquire", + "promoted", + ] = "initialized" TrackState = TentativeTrackState | ActiveTrackState +TrackUpdateAction = Literal[ + "tentative_observed", + "tentative_missed", + "promoted", + "direct_update", + "predict_only", + "direct_reacquire", + "proposal_reacquire", + "proposal_compatible", + "deleted_tentative", + "deleted_lost", +] + + +@dataclass(slots=True) +class TrackUpdateEvent: + track_id: int + action: TrackUpdateAction + accepted_view_count: int = 0 + accepted_joint_count: int = 0 + proposal_view_count: int = 0 + proposal_support_size: int = 0 + mean_reprojection_error: float = np.inf + + @dataclass(slots=True) class TrackedFrameResult: bundle_index: int @@ -66,6 +138,7 @@ class TrackedFrameResult: active_tracks: tuple[ActiveTrackState, ...] lost_tracks: tuple[ActiveTrackState, ...] proposals: tuple[ProposalCluster, ...] + update_events: tuple[TrackUpdateEvent, ...] = () @dataclass(slots=True) @@ -79,6 +152,13 @@ class TrackerDiagnostics: active_updates: int = 0 seed_initializations: int = 0 nonlinear_refinements: int = 0 + predict_only_updates: int = 0 + proposal_reacquisition_updates: int = 0 + direct_reacquisition_updates: int = 0 + tentative_updates: int = 0 + proposal_reacquisition_attempts: int = 0 + proposal_compatible_lost_frames: int = 0 + lm_iterations: int = 0 @dataclass(slots=True) @@ -89,14 +169,52 @@ class TrackerConfig: tentative_buffer_size: int = 5 tentative_min_age: int = 3 tentative_hits_required: int = 3 + tentative_min_mean_views: float = 2.0 + tentative_max_reprojection_error_px: float = 80.0 + tentative_max_bone_cv: float = 0.3 tentative_promote_score: float = 3.0 tentative_max_misses: int = 2 active_min_views: int = 2 + active_min_accepted_joints: int = 10 + lost_min_views: int = 1 + lost_min_accepted_joints: int = 8 + association_min_core_joints: int = 2 + lost_min_accepted_core_joints: int = 2 active_core_gate_px: float = 80.0 active_joint_gate_px: float = 120.0 - active_miss_to_lost: int = 3 + active_miss_to_lost: int = 5 lost_delete_age: int = 15 + lost_covariance_trace_max: float = 9_000.0 + lost_outside_volume_frames: int = 6 proposal_match_distance_m: float = 0.45 + proposal_core_match_distance_m: float = 0.7 + proposal_reacquire_root_distance_m: float = 0.75 + proposal_reacquire_core_distance_m: float = 1.05 + beta_grace_frames: int = 12 noise_ema: float = 0.85 + noise_min_px: float = 3.0 + noise_max_px: float = 45.0 + noise_residual_cap_px: float = 60.0 + process_noise_root_position_m: float = 0.03 + process_noise_root_rotation_rad: float = 0.02 + process_noise_joint_rad: float = 0.03 + process_noise_velocity: float = 0.04 + process_noise_beta: float = 0.01 + predict_only_process_scale: float = 1.75 + active_score_decay: float = 0.85 + active_score_view_gain: float = 1.2 + active_score_joint_gain: float = 1.0 + active_score_reprojection_penalty: float = 0.02 + active_score_miss_penalty: float = 0.75 + active_score_lost_threshold: float = -4.0 + lost_score_decay: float = 0.95 + lost_score_miss_penalty: float = 0.25 + lost_score_reacquire_gain: float = 0.75 + proposal_compatible_score_relief: float = 0.5 + lm_max_iterations: int = 5 + lm_damping: float = 0.02 + lm_step_epsilon: float = 1e-3 + lm_step_tolerance: float = 1e-4 + lm_student_t_dof: float = 4.0 proposal_min_score: float = 0.9 proposal_min_group_size: int = 1 diff --git a/src/pose_tracking_exp/tracking/kinematics.py b/src/pose_tracking_exp/tracking/kinematics.py index 0b61546..d0ccff2 100644 --- a/src/pose_tracking_exp/tracking/kinematics.py +++ b/src/pose_tracking_exp/tracking/kinematics.py @@ -1,16 +1,44 @@ import math +from collections.abc import Callable +from dataclasses import dataclass import numpy as np from beartype import beartype from scipy.optimize import least_squares from pose_tracking_exp.common.camera_math import project_pose -from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME +from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME, BODY20_OBSERVATION_COUNT from pose_tracking_exp.common.tensor_types import Pose3D from pose_tracking_exp.schema import CameraCalibration, PoseDetection, SkeletonState PARAMETER_DIMENSION = 31 SHAPE_DIMENSION = 8 +STABLE_LIMB_NAMES: tuple[tuple[str, str], ...] = ( + ("hip_middle", "shoulder_middle"), + ("shoulder_left", "shoulder_right"), + ("hip_left", "hip_right"), + ("shoulder_left", "elbow_left"), + ("shoulder_right", "elbow_right"), + ("elbow_left", "wrist_left"), + ("elbow_right", "wrist_right"), + ("hip_left", "knee_left"), + ("hip_right", "knee_right"), + ("knee_left", "ankle_left"), + ("knee_right", "ankle_right"), +) + + +@dataclass(slots=True) +class MultiviewUpdateResult: + state: SkeletonState + parameter_covariance: np.ndarray + beta_covariance: np.ndarray + accepted_joint_masks: dict[str, np.ndarray] + accepted_joint_counts_by_view: dict[str, int] + accepted_joint_count: int + accepted_view_count: int + mean_reprojection_error: float + lm_iterations: int def _rot_x(theta: float) -> np.ndarray: @@ -213,6 +241,64 @@ def _estimate_beta_from_pose(pose3d: Pose3D) -> np.ndarray: return np.clip(beta, 0.5, 2.0) +def _limb_length_samples(pose_buffer: list[Pose3D], name_a: str, name_b: str) -> list[float]: + index_a = BODY20_INDEX_BY_NAME[name_a] + index_b = BODY20_INDEX_BY_NAME[name_b] + samples: list[float] = [] + for pose3d in pose_buffer: + if pose3d[index_a, 3] <= 0.0 or pose3d[index_b, 3] <= 0.0: + continue + samples.append(float(np.linalg.norm(pose3d[index_a, :3] - pose3d[index_b, :3]))) + return samples + + +def estimate_beta_from_pose_buffer(pose_buffer: list[Pose3D]) -> np.ndarray: + if not pose_buffer: + return _default_shape() + + robust_pose = np.median(np.stack([np.asarray(pose3d, dtype=np.float64) for pose3d in pose_buffer], axis=0), axis=0) + beta = _estimate_beta_from_pose(np.asarray(robust_pose, dtype=np.float64)) + scale = float(beta[0]) if float(beta[0]) > 1e-8 else 1.0 + + limb_samples = { + limb_name: _limb_length_samples(pose_buffer, *joint_names) + for limb_name, joint_names in { + "torso": ("hip_middle", "shoulder_middle"), + "shoulder_width": ("shoulder_left", "shoulder_right"), + "pelvis_width": ("hip_left", "hip_right"), + "upper_arm": ("shoulder_left", "elbow_left"), + "lower_arm": ("elbow_left", "wrist_left"), + "upper_leg": ("hip_left", "knee_left"), + "lower_leg": ("knee_left", "ankle_left"), + }.items() + } + base_scales = { + "torso": 0.52, + "shoulder_width": 0.36, + "pelvis_width": 0.24, + "upper_arm": 0.30, + "lower_arm": 0.26, + "upper_leg": 0.45, + "lower_leg": 0.43, + } + beta_index = { + "torso": 1, + "shoulder_width": 2, + "pelvis_width": 3, + "upper_arm": 4, + "lower_arm": 5, + "upper_leg": 6, + "lower_leg": 7, + } + for limb_name, samples in limb_samples.items(): + if not samples: + continue + beta[beta_index[limb_name]] = float(np.median(np.asarray(samples, dtype=np.float64))) / ( + base_scales[limb_name] * scale + ) + return np.clip(beta, 0.5, 2.0) + + def _estimate_root_rotation(pose3d: Pose3D) -> np.ndarray: hip_left = pose3d[BODY20_INDEX_BY_NAME["hip_left"], :3] hip_right = pose3d[BODY20_INDEX_BY_NAME["hip_right"], :3] @@ -276,30 +362,205 @@ def initialize_state_from_pose3d(pose3d: Pose3D) -> SkeletonState: return refine_state_from_pose3d(pose3d) +def _parameter_bounds() -> tuple[np.ndarray, np.ndarray]: + lower = np.full((PARAMETER_DIMENSION,), -2.5, dtype=np.float64) + upper = np.full((PARAMETER_DIMENSION,), 2.5, dtype=np.float64) + lower[0:3] = -np.inf + upper[0:3] = np.inf + lower[3:6] = -math.pi + upper[3:6] = math.pi + return lower, upper + + +def _resolve_noise_by_view( + matched: dict[str, PoseDetection], + noise_scale: np.ndarray, + noise_by_view: dict[str, np.ndarray] | None, +) -> dict[str, np.ndarray]: + resolved: dict[str, np.ndarray] = {} + for camera_name in matched: + if noise_by_view is not None and camera_name in noise_by_view: + candidate = np.asarray(noise_by_view[camera_name], dtype=np.float64) + if candidate.shape == (BODY20_OBSERVATION_COUNT,): + resolved[camera_name] = candidate.copy() + continue + resolved[camera_name] = np.asarray(noise_scale, dtype=np.float64).copy() + return resolved + + +def _base_sigma_from_confidence(confidence: np.ndarray, min_px: float, max_px: float) -> np.ndarray: + clipped = np.clip(confidence, 0.05, 1.0) + return np.clip(max_px - (max_px - min_px) * clipped, min_px, max_px) + + +def _joint_acceptance_masks( + predicted_pose: Pose3D, + cameras: tuple[CameraCalibration, ...], + matched: dict[str, PoseDetection], + noise_by_view: dict[str, np.ndarray], + *, + joint_gate_px: float, +) -> dict[str, np.ndarray]: + camera_by_name = {camera.name: camera for camera in cameras} + accepted: dict[str, np.ndarray] = {} + for camera_name, detection in matched.items(): + projected = project_pose(camera_by_name[camera_name], predicted_pose) + confidence_mask = detection.keypoints[:, 2] > 0.05 + if not np.any(confidence_mask): + accepted[camera_name] = np.zeros((BODY20_OBSERVATION_COUNT,), dtype=bool) + continue + delta = projected[:, :2] - detection.keypoints[:, :2] + distance = np.sqrt(np.sum(delta * delta, axis=1)) + sigma = np.maximum(noise_by_view[camera_name], 1.0) + normalized = distance / sigma + accepted[camera_name] = confidence_mask & ( + (distance <= joint_gate_px) | (normalized <= 2.5) + ) + return accepted + + +@dataclass(slots=True) +class _ResidualEvaluation: + residual: np.ndarray + mean_reprojection_error: float + + +def _pack_candidate(parameters: np.ndarray, beta: np.ndarray, *, beta_frozen: bool) -> np.ndarray: + if beta_frozen: + return np.asarray(parameters, dtype=np.float64) + return np.concatenate([np.asarray(parameters, dtype=np.float64), np.asarray(beta, dtype=np.float64)], axis=0) + + +def _unpack_candidate(values: np.ndarray, predicted_beta: np.ndarray, *, beta_frozen: bool) -> tuple[np.ndarray, np.ndarray]: + parameters = np.asarray(values[:PARAMETER_DIMENSION], dtype=np.float64) + if beta_frozen: + return parameters, predicted_beta.copy() + beta = np.asarray(values[PARAMETER_DIMENSION:], dtype=np.float64) + return parameters, np.clip(beta, 0.5, 2.0) + + def _2d_update_residual( candidate: np.ndarray, predicted_parameters: np.ndarray, - beta: np.ndarray, - cameras: tuple[CameraCalibration, ...], matched: dict[str, PoseDetection], camera_by_name: dict[str, CameraCalibration], - noise_scale: np.ndarray, -) -> np.ndarray: - rendered = render_pose(candidate, beta) - residual_parts: list[np.ndarray] = [0.08 * (candidate - predicted_parameters)] + noise_by_view: dict[str, np.ndarray], + accepted_joint_masks: dict[str, np.ndarray], + *, + beta_frozen: bool, + predicted_beta: np.ndarray, + student_t_dof: float, +) -> _ResidualEvaluation: + parameters, resolved_beta = _unpack_candidate(candidate, predicted_beta, beta_frozen=beta_frozen) + rendered = render_pose(parameters, resolved_beta) + residual_parts: list[np.ndarray] = [0.08 * (parameters - predicted_parameters)] + if not beta_frozen: + residual_parts.append(0.2 * (resolved_beta - predicted_beta)) + reprojection_errors: list[np.ndarray] = [] for camera_name, detection in matched.items(): camera = camera_by_name[camera_name] projected = project_pose(camera, rendered) - joint_mask = detection.keypoints[:, 2] > 0.05 + joint_mask = accepted_joint_masks[camera_name] if not np.any(joint_mask): continue delta = projected[joint_mask, :2] - detection.keypoints[joint_mask, :2] - weights = np.sqrt(np.clip(detection.keypoints[joint_mask, 2], 0.05, 1.0)) - sigmas = noise_scale[joint_mask][:, None] - residual_parts.append((delta / sigmas) * weights[:, None]) + sigmas = np.maximum(noise_by_view[camera_name][joint_mask], 1.0)[:, None] + normalized = delta / sigmas + squared_distance = np.sum(normalized * normalized, axis=1) + confidence_weights = np.sqrt(np.clip(detection.keypoints[joint_mask, 2], 0.05, 1.0)) + student_t_weights = np.sqrt((student_t_dof + 2.0) / (student_t_dof + squared_distance)) + residual_parts.append(normalized * confidence_weights[:, None] * student_t_weights[:, None]) + reprojection_errors.append(np.sqrt(np.sum(delta * delta, axis=1))) if len(residual_parts) == 1: - return residual_parts[0] - return np.concatenate([part.reshape(-1) for part in residual_parts], axis=0) + flattened = residual_parts[0] + else: + flattened = np.concatenate([part.reshape(-1) for part in residual_parts], axis=0) + mean_reprojection_error = ( + float(np.mean(np.concatenate(reprojection_errors, axis=0))) + if reprojection_errors + else np.inf + ) + return _ResidualEvaluation( + residual=np.asarray(flattened, dtype=np.float64).reshape(-1), + mean_reprojection_error=mean_reprojection_error, + ) + + +def _finite_difference_jacobian( + values: np.ndarray, + evaluate: Callable[[np.ndarray], _ResidualEvaluation], + *, + step_epsilon: float, + lower_bounds: np.ndarray, + upper_bounds: np.ndarray, +) -> tuple[np.ndarray, _ResidualEvaluation]: + base = evaluate(values) + jacobian = np.zeros((base.residual.size, values.size), dtype=np.float64) + for column in range(values.size): + step = step_epsilon * max(1.0, abs(float(values[column]))) + candidate = values.copy() + candidate[column] = np.clip(candidate[column] + step, lower_bounds[column], upper_bounds[column]) + actual_step = candidate[column] - values[column] + if abs(float(actual_step)) <= 1e-12: + continue + shifted = evaluate(candidate) + jacobian[:, column] = (shifted.residual - base.residual) / actual_step + return jacobian, base + + +def _run_lm_update( + candidate0: np.ndarray, + evaluate: Callable[[np.ndarray], _ResidualEvaluation], + *, + lower_bounds: np.ndarray, + upper_bounds: np.ndarray, + max_iterations: int, + damping: float, + step_epsilon: float, + step_tolerance: float, +) -> tuple[np.ndarray, np.ndarray, _ResidualEvaluation, int]: + candidate = np.asarray(candidate0, dtype=np.float64).copy() + lambda_value = max(damping, 1e-6) + accepted_iterations = 0 + best_jacobian, best_evaluation = _finite_difference_jacobian( + candidate, + evaluate, + step_epsilon=step_epsilon, + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, + ) + best_loss = 0.5 * float(np.dot(best_evaluation.residual, best_evaluation.residual)) + for _ in range(max_iterations): + jtj = best_jacobian.T @ best_jacobian + diagonal = np.maximum(np.diag(jtj), 1e-6) + hessian = jtj + np.diag(lambda_value * diagonal) + gradient = best_jacobian.T @ best_evaluation.residual + try: + step = -np.linalg.solve(hessian, gradient) + except np.linalg.LinAlgError: + step = -np.linalg.pinv(hessian) @ gradient + if float(np.linalg.norm(step)) <= step_tolerance: + break + proposed = np.clip(candidate + step, lower_bounds, upper_bounds) + if np.allclose(proposed, candidate): + break + proposed_evaluation = evaluate(proposed) + proposed_loss = 0.5 * float(np.dot(proposed_evaluation.residual, proposed_evaluation.residual)) + if proposed_loss < best_loss: + candidate = proposed + best_loss = proposed_loss + best_jacobian, best_evaluation = _finite_difference_jacobian( + candidate, + evaluate, + step_epsilon=step_epsilon, + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, + ) + lambda_value = max(lambda_value * 0.5, 1e-6) + accepted_iterations += 1 + continue + lambda_value *= 4.0 + return candidate, best_jacobian, best_evaluation, accepted_iterations def update_state_from_multiview( @@ -307,41 +568,145 @@ def update_state_from_multiview( cameras: tuple[CameraCalibration, ...], matched: dict[str, PoseDetection], noise_scale: np.ndarray, -) -> SkeletonState: + noise_by_view: dict[str, np.ndarray] | None = None, + *, + joint_gate_px: float, + beta_frozen: bool, + max_iterations: int, + damping: float, + step_epsilon: float, + step_tolerance: float, + student_t_dof: float, +) -> MultiviewUpdateResult: if not matched: - return state + return MultiviewUpdateResult( + state=state, + parameter_covariance=np.eye(PARAMETER_DIMENSION, dtype=np.float64), + beta_covariance=np.eye(SHAPE_DIMENSION, dtype=np.float64) * 0.01, + accepted_joint_masks={}, + accepted_joint_counts_by_view={}, + accepted_joint_count=0, + accepted_view_count=0, + mean_reprojection_error=np.inf, + lm_iterations=0, + ) camera_by_name = {camera.name: camera for camera in cameras} - result = least_squares( - _2d_update_residual, - state.parameters, - args=(state.parameters, state.beta, cameras, matched, camera_by_name, noise_scale), - method="trf", - max_nfev=30, - loss="soft_l1", - f_scale=1.0, + resolved_noise_by_view = _resolve_noise_by_view(matched, noise_scale, noise_by_view) + accepted_joint_masks = _joint_acceptance_masks( + state.pose3d, + cameras, + matched, + resolved_noise_by_view, + joint_gate_px=joint_gate_px, + ) + accepted_joint_counts_by_view = { + camera_name: int(np.count_nonzero(mask)) + for camera_name, mask in accepted_joint_masks.items() + } + accepted_joint_count = sum(accepted_joint_counts_by_view.values()) + accepted_view_count = sum(int(count > 0) for count in accepted_joint_counts_by_view.values()) + candidate0 = _pack_candidate(state.parameters, state.beta, beta_frozen=beta_frozen) + parameter_lower, parameter_upper = _parameter_bounds() + if beta_frozen: + lower_bounds = parameter_lower + upper_bounds = parameter_upper + else: + beta_lower = np.clip(state.beta - 0.2, 0.5, 2.0) + beta_upper = np.clip(state.beta + 0.2, 0.5, 2.0) + lower_bounds = np.concatenate([parameter_lower, beta_lower], axis=0) + upper_bounds = np.concatenate([parameter_upper, beta_upper], axis=0) + lower_bounds = np.minimum(lower_bounds, candidate0 - 1e-6) + upper_bounds = np.maximum(upper_bounds, candidate0 + 1e-6) + + def evaluate(candidate: np.ndarray) -> _ResidualEvaluation: + return _2d_update_residual( + candidate, + state.parameters, + matched, + camera_by_name, + resolved_noise_by_view, + accepted_joint_masks, + beta_frozen=beta_frozen, + predicted_beta=state.beta, + student_t_dof=student_t_dof, + ) + + candidate, jacobian, evaluation, lm_iterations = _run_lm_update( + candidate0, + evaluate, + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, + max_iterations=max_iterations, + damping=damping, + step_epsilon=step_epsilon, + step_tolerance=step_tolerance, + ) + parameters, resolved_beta = _unpack_candidate(candidate, state.beta, beta_frozen=beta_frozen) + rendered = render_pose(parameters, resolved_beta) + if jacobian.size == 0: + parameter_covariance = np.eye(PARAMETER_DIMENSION, dtype=np.float64) + beta_covariance = np.eye(SHAPE_DIMENSION, dtype=np.float64) * 0.01 + else: + jtj = jacobian.T @ jacobian + stabilized = jtj + np.eye(jtj.shape[0], dtype=np.float64) * 1e-6 + full_covariance = np.linalg.pinv(stabilized) + parameter_covariance = full_covariance[:PARAMETER_DIMENSION, :PARAMETER_DIMENSION] + if beta_frozen: + beta_covariance = np.eye(SHAPE_DIMENSION, dtype=np.float64) * 0.01 + else: + beta_covariance = full_covariance[PARAMETER_DIMENSION:, PARAMETER_DIMENSION:] + return MultiviewUpdateResult( + state=SkeletonState(parameters=parameters, beta=resolved_beta, pose3d=rendered), + parameter_covariance=np.asarray(parameter_covariance, dtype=np.float64), + beta_covariance=np.asarray(beta_covariance, dtype=np.float64), + accepted_joint_masks=accepted_joint_masks, + accepted_joint_counts_by_view=accepted_joint_counts_by_view, + accepted_joint_count=accepted_joint_count, + accepted_view_count=accepted_view_count, + mean_reprojection_error=evaluation.mean_reprojection_error, + lm_iterations=lm_iterations, ) - parameters = np.asarray(result.x, dtype=np.float64) - return SkeletonState(parameters=parameters, beta=state.beta.copy(), pose3d=render_pose(parameters, state.beta)) def update_noise_scale( previous: np.ndarray, + previous_by_view: dict[str, np.ndarray], cameras: tuple[CameraCalibration, ...], pose3d: Pose3D, matched: dict[str, PoseDetection], + accepted_joint_masks: dict[str, np.ndarray], *, ema: float, -) -> np.ndarray: + min_px: float, + max_px: float, + residual_cap_px: float, +) -> tuple[np.ndarray, dict[str, np.ndarray]]: if not matched: - return previous + return previous, previous_by_view updated = previous.copy() + updated_by_view = {name: np.asarray(value, dtype=np.float64).copy() for name, value in previous_by_view.items()} camera_by_name = {camera.name: camera for camera in cameras} for camera_name, detection in matched.items(): projected = project_pose(camera_by_name[camera_name], pose3d) - mask = detection.keypoints[:, 2] > 0.05 + mask = accepted_joint_masks.get(camera_name) + if mask is None: + mask = detection.keypoints[:, 2] > 0.05 if not np.any(mask): continue + previous_view = updated_by_view.get(camera_name, previous.copy()) residual = projected[mask, :2] - detection.keypoints[mask, :2] magnitude = np.sqrt(np.sum(residual * residual, axis=1)) - updated[mask] = np.clip(ema * updated[mask] + (1.0 - ema) * magnitude, 3.0, 45.0) - return updated + base_sigma = _base_sigma_from_confidence(detection.keypoints[mask, 2], min_px, max_px) + observed_sigma = np.sqrt( + np.square(base_sigma) + np.square(np.clip(magnitude, 0.0, residual_cap_px)) + ) + previous_view[mask] = np.clip( + ema * previous_view[mask] + (1.0 - ema) * observed_sigma, + min_px, + max_px, + ) + updated_by_view[camera_name] = previous_view + if updated_by_view: + stacked = np.stack(list(updated_by_view.values()), axis=0) + updated = np.median(stacked, axis=0) + return updated, updated_by_view diff --git a/src/pose_tracking_exp/tracking/replay_io.py b/src/pose_tracking_exp/tracking/replay_io.py index c57c719..f98d2d1 100644 --- a/src/pose_tracking_exp/tracking/replay_io.py +++ b/src/pose_tracking_exp/tracking/replay_io.py @@ -32,7 +32,12 @@ def _as_float_array(values: object, shape: tuple[int, ...]) -> np.ndarray: @beartype def load_scene_file(path: Path) -> SceneConfig: payload = json.loads(path.read_text(encoding="utf-8")) - default_extrinsic_format = str(payload.get("extrinsic_format", _OPENCV_EXTRINSICS)) + default_extrinsic_format = str( + payload.get( + "extrinsic_format", + _RPT_POSE if "imgpaths" in payload and "extrinsic_format" not in payload else _OPENCV_EXTRINSICS, + ) + ) cameras: list[CameraCalibration] = [] for camera_payload in payload["cameras"]: extrinsic_format = str( diff --git a/src/pose_tracking_exp/tracking/rpt_adapter.py b/src/pose_tracking_exp/tracking/rpt_adapter.py index 1b0633c..52febd2 100644 --- a/src/pose_tracking_exp/tracking/rpt_adapter.py +++ b/src/pose_tracking_exp/tracking/rpt_adapter.py @@ -3,9 +3,11 @@ import rpt from beartype import beartype from rpt._core import TriangulationConfig, TriangulationTrace # type: ignore[reportMissingModuleSource] +from pose_tracking_exp.common.camera_math import project_pose from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME, BODY20_JOINT_NAMES, BODY20_OBSERVATION_COUNT +from pose_tracking_exp.common.normalization import core_reprojection_distance from pose_tracking_exp.common.tensor_types import Pose2D -from pose_tracking_exp.schema import CameraFrame, ProposalCluster, SceneConfig +from pose_tracking_exp.schema import CameraCalibration, CameraFrame, ProposalCluster, SceneConfig def build_rpt_config( @@ -54,28 +56,66 @@ def pack_view_detections(frames: tuple[CameraFrame, ...], unmatched_indices: dic @beartype def extract_clusters( trace: TriangulationTrace, - camera_names: tuple[str, ...], + frames: tuple[CameraFrame, ...], + cameras: tuple[CameraCalibration, ...], + unmatched_indices: dict[str, list[int]], ) -> tuple[ProposalCluster, ...]: clusters: list[ProposalCluster] = [] + camera_by_name = {camera.name: camera for camera in cameras} + frame_by_name = {frame.camera_name: frame for frame in frames} for pose_index, pose3d in enumerate(trace.final_poses): pose_array = np.asarray(pose3d, dtype=np.float64) root = pose_array[BODY20_INDEX_BY_NAME["hip_middle"], :3] + root_centered_pose = pose_array.copy() + root_centered_pose[:, :3] -= root[None, :] source_indices = [] if pose_index < len(trace.merge.group_proposal_indices): source_indices = trace.merge.group_proposal_indices[pose_index] source_views: set[str] = set() + reprojection_errors: list[float] = [] + observation_keys: set[tuple[str, int]] = set() + proposal_scores: list[float] = [] + support_detection_indices: dict[str, set[int]] = {} for core_proposal_index in source_indices: - pair = trace.core_proposals[core_proposal_index].pair - source_views.add(camera_names[pair.view1]) - source_views.add(camera_names[pair.view2]) - mean_score = float(np.mean(pose_array[:, 3][pose_array[:, 3] > 0.0])) if np.any(pose_array[:, 3] > 0.0) else 0.0 + core_proposal = trace.core_proposals[core_proposal_index] + pair = core_proposal.pair + proposal_scores.append(float(core_proposal.score)) + for view_index, person_index in ((pair.view1, pair.person1), (pair.view2, pair.person2)): + camera_name = frames[view_index].camera_name + source_views.add(camera_name) + if (camera_name, int(person_index)) in observation_keys: + continue + observation_keys.add((camera_name, int(person_index))) + detection_indices = unmatched_indices.get(camera_name, []) + if person_index < 0 or person_index >= len(detection_indices): + continue + detection_index = int(detection_indices[person_index]) + support_detection_indices.setdefault(camera_name, set()).add(detection_index) + detection = frame_by_name[camera_name].detections[detection_index] + projected = project_pose(camera_by_name[camera_name], pose_array) + reprojection_errors.append(core_reprojection_distance(projected, detection.keypoints)) + mean_score = ( + float(np.mean(np.asarray(proposal_scores, dtype=np.float64))) + if proposal_scores + else float(np.mean(pose_array[:, 3][pose_array[:, 3] > 0.0])) if np.any(pose_array[:, 3] > 0.0) else 0.0 + ) clusters.append( ProposalCluster( pose3d=pose_array, root=np.asarray(root, dtype=np.float64), + root_centered_pose3d=np.asarray(root_centered_pose, dtype=np.float64), source_views=frozenset(source_views), + view_count=len(source_views), support_size=max(1, len(source_indices)), + pair_count=max(1, len(source_indices)), mean_score=mean_score, + mean_reprojection_error=( + float(np.median(np.asarray(reprojection_errors, dtype=np.float64))) if reprojection_errors else np.inf + ), + support_detection_indices={ + camera_name: tuple(sorted(indices)) + for camera_name, indices in support_detection_indices.items() + }, ) ) return tuple(clusters) diff --git a/src/pose_tracking_exp/tracking/tracker.py b/src/pose_tracking_exp/tracking/tracker.py index 5c036e8..659a79e 100644 --- a/src/pose_tracking_exp/tracking/tracker.py +++ b/src/pose_tracking_exp/tracking/tracker.py @@ -1,5 +1,6 @@ -from dataclasses import replace +from dataclasses import dataclass, replace from time import perf_counter +from typing import Literal import numpy as np from beartype import beartype @@ -7,7 +8,6 @@ from scipy.optimize import linear_sum_assignment from pose_tracking_exp.common.camera_math import project_pose from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME, CORE_JOINT_INDICES -from pose_tracking_exp.common.normalization import core_reprojection_distance from pose_tracking_exp.schema import ( ActiveTrackState, FrameBundle, @@ -16,48 +16,224 @@ from pose_tracking_exp.schema import ( SceneConfig, SkeletonState, TentativeTrackState, + TRACK_COVARIANCE_DIMENSION, + TrackUpdateEvent, TrackedFrameResult, TrackerConfig, TrackerDiagnostics, ) -from pose_tracking_exp.tracking.kinematics import seed_state_from_pose3d, update_noise_scale, update_state_from_multiview +from pose_tracking_exp.tracking.kinematics import ( + MultiviewUpdateResult, + PARAMETER_DIMENSION, + STABLE_LIMB_NAMES, + estimate_beta_from_pose_buffer, + render_pose, + seed_state_from_pose3d, + update_noise_scale, + update_state_from_multiview, +) from pose_tracking_exp.tracking.rpt_adapter import build_rpt_config, extract_clusters, pack_view_detections CORE_JOINT_MASK = np.zeros((20,), dtype=bool) CORE_JOINT_MASK[list(CORE_JOINT_INDICES)] = True +VELOCITY_SLICE = slice(PARAMETER_DIMENSION, PARAMETER_DIMENSION * 2) +BETA_SLICE = slice(PARAMETER_DIMENSION * 2, TRACK_COVARIANCE_DIMENSION) + + +@dataclass(slots=True) +class _PredictedTrackState: + skeleton: SkeletonState + covariance: np.ndarray def _copy_pose3d(pose3d: np.ndarray) -> np.ndarray: return np.asarray(pose3d, dtype=np.float64).copy() -def _proposal_rank(proposal: ProposalCluster) -> tuple[int, float]: - return proposal.support_size, proposal.mean_score +def _copy_root_centered_pose3d(pose3d: np.ndarray) -> np.ndarray: + copied = _copy_pose3d(pose3d) + root = copied[BODY20_INDEX_BY_NAME["hip_middle"], :3] + copied[:, :3] -= root[None, :] + return copied -def _active_track_rank(track: ActiveTrackState) -> tuple[float, int, int]: - return track.score, -track.misses, track.last_bundle_index +def _mean_finite(values: list[float], *, default: float) -> float: + finite = [value for value in values if np.isfinite(value)] + if not finite: + return default + return float(np.mean(np.asarray(finite, dtype=np.float64))) -def _lost_track_rank(track: ActiveTrackState) -> tuple[float, int, int]: - return track.score, -track.lost_age, track.last_bundle_index +def _coefficient_of_variation(samples: list[float]) -> float: + if len(samples) < 2: + return 0.0 + values = np.asarray(samples, dtype=np.float64) + mean = float(np.mean(values)) + if mean <= 1e-8: + return 0.0 + return float(np.std(values) / mean) -def _tentative_track_rank(track: TentativeTrackState) -> tuple[float, int, int, int]: - return track.score, len(track.evidence_buffer), -track.misses, track.last_bundle_index +def _limb_length(pose3d: np.ndarray, name_a: str, name_b: str) -> float | None: + index_a = BODY20_INDEX_BY_NAME[name_a] + index_b = BODY20_INDEX_BY_NAME[name_b] + if pose3d[index_a, 3] <= 0.0 or pose3d[index_b, 3] <= 0.0: + return None + return float(np.linalg.norm(pose3d[index_a, :3] - pose3d[index_b, :3])) + + +def _track_root(track: ActiveTrackState) -> np.ndarray: + return np.asarray(track.skeleton.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], :3], dtype=np.float64) + + +def _covariance_trace(covariance: np.ndarray) -> float: + return float(np.trace(np.asarray(covariance, dtype=np.float64))) + + +def _proposal_rank(proposal: ProposalCluster) -> tuple[int, int, float, float]: + return proposal.view_count, proposal.support_size, proposal.mean_score, -proposal.mean_reprojection_error + + +def _active_track_rank(track: ActiveTrackState) -> tuple[int, float, int, int]: + return 2, track.score, -track.misses, track.last_bundle_index + + +def _lost_track_rank(track: ActiveTrackState) -> tuple[int, float, int, int]: + return 1, track.score, -track.lost_age, track.last_bundle_index + + +def _tentative_track_rank(track: TentativeTrackState) -> tuple[int, float, int, int]: + return 0, track.score, track.hit_count, track.last_bundle_index def _tentative_root_spread(track: TentativeTrackState) -> float: if len(track.evidence_buffer) < 2: return 0.0 - roots = np.asarray( - [pose[BODY20_INDEX_BY_NAME["hip_middle"], :3] for pose in track.evidence_buffer], - dtype=np.float64, - ) + roots = np.asarray([proposal.root for proposal in track.evidence_buffer], dtype=np.float64) center = roots.mean(axis=0) return float(np.max(np.linalg.norm(roots - center[None, :], axis=1))) +def _tentative_bone_cv(track: TentativeTrackState) -> float: + if len(track.evidence_buffer) < 2: + return 0.0 + pose_buffer = [proposal.pose3d for proposal in track.evidence_buffer] + cvs: list[float] = [] + for name_a, name_b in STABLE_LIMB_NAMES: + samples = [ + limb_length + for pose3d in pose_buffer + if (limb_length := _limb_length(pose3d, name_a, name_b)) is not None + ] + if len(samples) >= 2: + cvs.append(_coefficient_of_variation(samples)) + if not cvs: + return 0.0 + return float(np.mean(np.asarray(cvs, dtype=np.float64))) + + +def _core_pose_distance(left: np.ndarray, right: np.ndarray) -> float: + mask = CORE_JOINT_MASK.copy() + mask &= left[:, 3] > 0.0 + mask &= right[:, 3] > 0.0 + if not np.any(mask): + return np.inf + deltas = left[mask, :3] - right[mask, :3] + return float(np.sqrt(np.mean(np.sum(deltas * deltas, axis=1)))) + + +def _core_match_metrics( + projected: np.ndarray, + detection: PoseDetection, + sigma: np.ndarray, + *, + gate_px: float, +) -> tuple[float, int]: + observed = detection.keypoints + mask = CORE_JOINT_MASK.copy() + mask &= projected[:, 2] > 0.0 + mask &= observed[:, 2] > 0.05 + if not np.any(mask): + return np.inf, 0 + delta = projected[mask, :2] - observed[mask, :2] + distance = np.sqrt(np.sum(delta * delta, axis=1)) + normalized = distance / np.maximum(sigma[mask], 1.0) + accepted = (distance <= gate_px) | (normalized <= 2.5) + accepted_count = int(np.count_nonzero(accepted)) + if accepted_count <= 0: + return np.inf, 0 + accepted_delta = delta[accepted] + accepted_weights = np.clip(observed[mask, 2][accepted], 0.05, 1.0) + accepted_sigma = np.maximum(np.square(sigma[mask][accepted]), 1.0) + squared = np.sum(accepted_delta * accepted_delta, axis=1) + normalized = squared / accepted_sigma + return float(np.sqrt(np.average(normalized, weights=accepted_weights))), accepted_count + + +def _track_score_after_update( + score: float, + *, + decay: float, + view_gain: float, + joint_gain: float, + reprojection_penalty: float, + miss_penalty: float, + reacquire_gain: float = 0.0, + accepted_views: int, + accepted_joints: int, + mean_reprojection_error: float, + miss: bool, +) -> float: + updated = decay * score + updated += view_gain * float(accepted_views) + updated += joint_gain * (float(accepted_joints) / 20.0) + if np.isfinite(mean_reprojection_error): + updated -= reprojection_penalty * float(mean_reprojection_error) + if miss: + updated -= miss_penalty + else: + updated += reacquire_gain + return float(updated) + + +def _track_process_noise(config: TrackerConfig, *, beta_frozen: bool, scale: float) -> np.ndarray: + diagonal = np.full((TRACK_COVARIANCE_DIMENSION,), config.process_noise_joint_rad**2, dtype=np.float64) + diagonal[:3] = config.process_noise_root_position_m**2 + diagonal[3:6] = config.process_noise_root_rotation_rad**2 + diagonal[6:PARAMETER_DIMENSION] = config.process_noise_joint_rad**2 + diagonal[VELOCITY_SLICE] = config.process_noise_velocity**2 + diagonal[BETA_SLICE] = 0.0 if beta_frozen else config.process_noise_beta**2 + return diagonal * scale + + +def _embed_measurement_covariance( + predicted_covariance: np.ndarray, + parameter_covariance: np.ndarray, + beta_covariance: np.ndarray, + *, + beta_frozen: bool, +) -> np.ndarray: + updated = np.asarray(predicted_covariance, dtype=np.float64).copy() + updated[:PARAMETER_DIMENSION, :PARAMETER_DIMENSION] = np.asarray(parameter_covariance, dtype=np.float64) + updated[:PARAMETER_DIMENSION, VELOCITY_SLICE] *= 0.5 + updated[VELOCITY_SLICE, :PARAMETER_DIMENSION] = updated[:PARAMETER_DIMENSION, VELOCITY_SLICE].T + updated[VELOCITY_SLICE, VELOCITY_SLICE] *= 0.75 + if not beta_frozen: + updated[BETA_SLICE, BETA_SLICE] = np.asarray(beta_covariance, dtype=np.float64) + updated[:PARAMETER_DIMENSION, BETA_SLICE] = 0.0 + updated[BETA_SLICE, :PARAMETER_DIMENSION] = 0.0 + updated[VELOCITY_SLICE, BETA_SLICE] = 0.0 + updated[BETA_SLICE, VELOCITY_SLICE] = 0.0 + symmetrized = 0.5 * (updated + updated.T) + return symmetrized + np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64) * 1e-6 + + +def _tentative_reprojection_penalty(proposal: ProposalCluster) -> float: + if not np.isfinite(proposal.mean_reprojection_error): + return 0.0 + return 0.001 * proposal.mean_reprojection_error + + @beartype class PoseTracker: def __init__(self, scene: SceneConfig, config: TrackerConfig | None = None) -> None: @@ -87,14 +263,16 @@ class PoseTracker: def step(self, bundle: FrameBundle) -> TrackedFrameResult: self._enforce_track_limits() - matches, unmatched = self._match_existing_tracks(bundle) - self._update_active_tracks(bundle, matches) - self._update_lost_tracks(bundle, matches) - proposals = self._refresh_capped_single_track_from_proposals(bundle, self._build_proposals(bundle, unmatched)) - self._update_tentative_tracks(bundle, self._birth_candidate_proposals(proposals)) - self._promote_tentative_tracks(bundle) - self._reacquire_lost_tracks(bundle, proposals) - self._delete_expired_tracks() + update_events: list[TrackUpdateEvent] = [] + predicted_states = self._predicted_skeletons() + matches, unmatched = self._match_existing_tracks(bundle, predicted_states) + self._update_active_tracks(bundle, predicted_states, matches, update_events) + self._update_lost_tracks(bundle, predicted_states, matches, update_events) + proposals = self._build_proposals(bundle, unmatched) + self._update_tentative_tracks(bundle, proposals, update_events) + self._promote_tentative_tracks(bundle, update_events) + self._reacquire_lost_tracks(bundle, predicted_states, proposals, update_events) + self._delete_expired_tracks(update_events) self._enforce_track_limits() return TrackedFrameResult( bundle_index=bundle.bundle_index, @@ -102,84 +280,93 @@ class PoseTracker: tentative_tracks=tuple(sorted(self._tentative.values(), key=lambda item: item.track_id)), active_tracks=tuple(sorted(self._active.values(), key=lambda item: item.track_id)), lost_tracks=tuple(sorted(self._lost.values(), key=lambda item: item.track_id)), - proposals=proposals, + proposals=tuple(sorted(proposals, key=_proposal_rank, reverse=True)), + update_events=tuple(update_events), ) def _track_limit(self) -> int | None: return self._config.max_active_tracks - def _single_track_cap_enabled(self) -> bool: - return self._config.max_active_tracks == 1 - - def _keep_best_active_tracks(self, limit: int) -> None: - if len(self._active) <= limit: - return - ranked_ids = sorted(self._active, key=lambda track_id: _active_track_rank(self._active[track_id]), reverse=True) - keep_ids = set(ranked_ids[:limit]) - for track_id in list(self._active): - if track_id not in keep_ids: - self._active.pop(track_id, None) - - def _keep_best_lost_tracks(self, limit: int) -> None: - if len(self._lost) <= limit: - return - ranked_ids = sorted(self._lost, key=lambda track_id: _lost_track_rank(self._lost[track_id]), reverse=True) - keep_ids = set(ranked_ids[:limit]) - for track_id in list(self._lost): - if track_id not in keep_ids: - self._lost.pop(track_id, None) - - def _keep_best_tentative_tracks(self, limit: int) -> None: - if len(self._tentative) <= limit: - return - ranked_ids = sorted( - self._tentative, - key=lambda track_id: _tentative_track_rank(self._tentative[track_id]), - reverse=True, - ) - keep_ids = set(ranked_ids[:limit]) - for track_id in list(self._tentative): - if track_id not in keep_ids: - self._tentative.pop(track_id, None) + def _capacity_for_new_track(self) -> int | None: + limit = self._track_limit() + if limit is None: + return None + return max(0, limit - (len(self._active) + len(self._lost) + len(self._tentative))) def _enforce_track_limits(self) -> None: limit = self._track_limit() if limit is None: return - self._keep_best_active_tracks(limit) - self._keep_best_lost_tracks(limit) - self._keep_best_tentative_tracks(limit) - if not self._single_track_cap_enabled(): - return - if self._active: - self._lost.clear() - self._tentative.clear() - return - if self._lost: - self._tentative.clear() + ranked_entries: list[tuple[tuple[int, float, int, int], str, int]] = [] + ranked_entries.extend((_active_track_rank(track), "active", track_id) for track_id, track in self._active.items()) + ranked_entries.extend((_lost_track_rank(track), "lost", track_id) for track_id, track in self._lost.items()) + ranked_entries.extend( + (_tentative_track_rank(track), "tentative", track_id) for track_id, track in self._tentative.items() + ) + ranked_entries.sort(key=lambda item: item[0], reverse=True) + keep_entries = {(bucket, track_id) for _, bucket, track_id in ranked_entries[:limit]} + for track_id in list(self._active): + if ("active", track_id) not in keep_entries: + self._active.pop(track_id, None) + for track_id in list(self._lost): + if ("lost", track_id) not in keep_entries: + self._lost.pop(track_id, None) + for track_id in list(self._tentative): + if ("tentative", track_id) not in keep_entries: + self._tentative.pop(track_id, None) - def _predicted_pose_by_track(self) -> dict[int, np.ndarray]: - result: dict[int, np.ndarray] = {} + def _predict_track_state(self, track: ActiveTrackState) -> _PredictedTrackState: + predicted_parameters = track.skeleton.parameters + track.joint_velocity + predicted_parameters = np.asarray(predicted_parameters, dtype=np.float64) + predicted_parameters[0:3] = track.skeleton.parameters[0:3] + track.root_velocity + predicted_pose = render_pose(predicted_parameters, track.skeleton.beta) + transition = np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64) + transition[:PARAMETER_DIMENSION, VELOCITY_SLICE] = np.eye(PARAMETER_DIMENSION, dtype=np.float64) + process_noise = np.diag( + _track_process_noise( + self._config, + beta_frozen=track.beta_frozen, + scale=1.0, + ) + ) + predicted_covariance = transition @ track.covariance @ transition.T + process_noise + return _PredictedTrackState( + skeleton=SkeletonState( + parameters=predicted_parameters, + beta=track.skeleton.beta.copy(), + pose3d=predicted_pose, + ), + covariance=0.5 * (predicted_covariance + predicted_covariance.T), + ) + + def _is_in_working_volume(self, root: np.ndarray) -> bool: + half_extent = self._scene.room_size * 0.5 + lower = self._scene.room_center - half_extent + upper = self._scene.room_center + half_extent + return bool(np.all(root >= lower) and np.all(root <= upper)) + + def _predicted_skeletons(self) -> dict[int, _PredictedTrackState]: + result: dict[int, _PredictedTrackState] = {} for track_id, track in self._active.items(): - result[track_id] = track.skeleton.pose3d + result[track_id] = self._predict_track_state(track) for track_id, track in self._lost.items(): - result[track_id] = track.skeleton.pose3d + result[track_id] = self._predict_track_state(track) return result def _match_existing_tracks( self, bundle: FrameBundle, + predicted_states: dict[int, _PredictedTrackState], ) -> tuple[dict[int, dict[str, PoseDetection]], dict[str, list[int]]]: started_at = perf_counter() try: - predicted = self._predicted_pose_by_track() - matches: dict[int, dict[str, PoseDetection]] = {track_id: {} for track_id in predicted} + matches: dict[int, dict[str, PoseDetection]] = {track_id: {} for track_id in predicted_states} unmatched_indices = {frame.camera_name: list(range(len(frame.detections))) for frame in bundle.views} - if not predicted: + if not predicted_states: return matches, unmatched_indices camera_by_name = {camera.name: camera for camera in self._scene.cameras} - track_ids = sorted(predicted) + track_ids = sorted(predicted_states) for frame in bundle.views: cost = np.full( (len(track_ids), len(frame.detections)), @@ -187,13 +374,25 @@ class PoseTracker: dtype=np.float64, ) for row, track_id in enumerate(track_ids): - projected = project_pose(camera_by_name[frame.camera_name], predicted[track_id]) - projected_core = projected.copy() - projected_core[~CORE_JOINT_MASK, 2] = 0.0 + track = self._active.get(track_id, self._lost.get(track_id)) + if track is None: + continue + projected = project_pose(camera_by_name[frame.camera_name], predicted_states[track_id].skeleton.pose3d) + sigma = track.noise_by_view.get(frame.camera_name, track.noise_scale) + min_core_joints = ( + self._config.lost_min_accepted_core_joints + if track.status == "lost" + else self._config.association_min_core_joints + ) for col, detection in enumerate(frame.detections): - observed_core = detection.keypoints.copy() - observed_core[~CORE_JOINT_MASK, 2] = 0.0 - cost[row, col] = core_reprojection_distance(projected_core, observed_core) + match_cost, accepted_core_joints = _core_match_metrics( + projected, + detection, + sigma, + gate_px=self._config.active_core_gate_px, + ) + if accepted_core_joints >= min_core_joints: + cost[row, col] = match_cost if cost.size == 0: continue rows, cols = linear_sum_assignment(cost) @@ -214,44 +413,277 @@ class PoseTracker: def _refine_track_state( self, track: ActiveTrackState, + predicted_state: SkeletonState, matched: dict[str, PoseDetection], - ) -> tuple[SkeletonState, np.ndarray]: + ): self._diagnostics.nonlinear_refinements += 1 - updated_skeleton = update_state_from_multiview( - track.skeleton, + update_result = update_state_from_multiview( + predicted_state, self._scene.cameras, matched, track.noise_scale, + track.noise_by_view, + joint_gate_px=self._config.active_joint_gate_px, + beta_frozen=track.beta_frozen, + max_iterations=self._config.lm_max_iterations, + damping=self._config.lm_damping, + step_epsilon=self._config.lm_step_epsilon, + step_tolerance=self._config.lm_step_tolerance, + student_t_dof=self._config.lm_student_t_dof, ) - updated_noise = update_noise_scale( + updated_noise, updated_noise_by_view = update_noise_scale( track.noise_scale, + track.noise_by_view, self._scene.cameras, - updated_skeleton.pose3d, + update_result.state.pose3d, matched, + update_result.accepted_joint_masks, ema=self._config.noise_ema, + min_px=self._config.noise_min_px, + max_px=self._config.noise_max_px, + residual_cap_px=self._config.noise_residual_cap_px, ) self._diagnostics.active_updates += 1 - return updated_skeleton, updated_noise + self._diagnostics.lm_iterations += update_result.lm_iterations + return update_result, updated_noise, updated_noise_by_view def _seed_skeleton(self, pose3d: np.ndarray, beta: np.ndarray | None = None) -> SkeletonState: self._diagnostics.seed_initializations += 1 return seed_state_from_pose3d(_copy_pose3d(pose3d), beta=None if beta is None else beta.copy()) - def _update_active_tracks(self, bundle: FrameBundle, matches: dict[int, dict[str, PoseDetection]]) -> None: + def _predict_only_update( + self, + track: ActiveTrackState, + predicted: _PredictedTrackState, + *, + bundle_index: int, + lost_track: bool, + ) -> None: + track.skeleton = predicted.skeleton + track.covariance = predicted.covariance + np.diag( + _track_process_noise( + self._config, + beta_frozen=track.beta_frozen, + scale=self._config.predict_only_process_scale, + ) + ) + if lost_track: + track.score = _track_score_after_update( + track.score, + decay=self._config.lost_score_decay, + view_gain=0.0, + joint_gain=0.0, + reprojection_penalty=0.0, + miss_penalty=self._config.lost_score_miss_penalty, + accepted_views=0, + accepted_joints=0, + mean_reprojection_error=np.inf, + miss=True, + ) + else: + track.score = _track_score_after_update( + track.score, + decay=self._config.active_score_decay, + view_gain=0.0, + joint_gain=0.0, + reprojection_penalty=0.0, + miss_penalty=self._config.active_score_miss_penalty, + accepted_views=0, + accepted_joints=0, + mean_reprojection_error=np.inf, + miss=True, + ) + track.last_bundle_index = bundle_index + track.last_update_kind = "predict_only" + track.mean_reprojection_error = np.inf + track.outside_volume_age = 0 if self._is_in_working_volume(_track_root(track)) else track.outside_volume_age + 1 + if lost_track: + track.lost_age += 1 + else: + track.misses += 1 + self._diagnostics.predict_only_updates += 1 + + def _apply_successful_update( + self, + track: ActiveTrackState, + predicted: _PredictedTrackState, + update_result, + updated_noise: np.ndarray, + updated_noise_by_view: dict[str, np.ndarray], + *, + bundle_index: int, + update_kind: Literal["direct_update", "direct_reacquire", "proposal_reacquire"], + ) -> None: + previous_parameters = track.skeleton.parameters.copy() + updated_skeleton = update_result.state + track.root_velocity = updated_skeleton.parameters[0:3] - previous_parameters[0:3] + track.joint_velocity = updated_skeleton.parameters - previous_parameters + track.skeleton = updated_skeleton + track.noise_scale = updated_noise + track.noise_by_view = updated_noise_by_view + track.covariance = _embed_measurement_covariance( + predicted.covariance, + update_result.parameter_covariance, + update_result.beta_covariance, + beta_frozen=track.beta_frozen, + ) + reacquire_gain = self._config.lost_score_reacquire_gain if track.status == "lost" else 0.0 + track.score = _track_score_after_update( + track.score, + decay=self._config.active_score_decay, + view_gain=self._config.active_score_view_gain, + joint_gain=self._config.active_score_joint_gain, + reprojection_penalty=self._config.active_score_reprojection_penalty, + miss_penalty=self._config.active_score_miss_penalty, + reacquire_gain=reacquire_gain, + accepted_views=update_result.accepted_view_count, + accepted_joints=update_result.accepted_joint_count, + mean_reprojection_error=update_result.mean_reprojection_error, + miss=False, + ) + track.misses = 0 + track.lost_age = 0 + track.last_bundle_index = bundle_index + track.last_update_kind = update_kind + track.mean_reprojection_error = update_result.mean_reprojection_error + track.outside_volume_age = 0 if self._is_in_working_volume(_track_root(track)) else track.outside_volume_age + 1 + if not track.beta_frozen: + track.beta_grace_age += 1 + if track.beta_grace_age >= self._config.beta_grace_frames: + track.beta_frozen = True + track.status = "active" + + def _proposal_support_matches( + self, + bundle: FrameBundle, + track: ActiveTrackState, + proposal: ProposalCluster, + seeded_state: SkeletonState, + ) -> dict[str, PoseDetection]: + matched: dict[str, PoseDetection] = {} + camera_by_name = {camera.name: camera for camera in self._scene.cameras} + frame_by_name = {frame.camera_name: frame for frame in bundle.views} + for camera_name in proposal.source_views: + frame = frame_by_name.get(camera_name) + camera = camera_by_name.get(camera_name) + if frame is None or camera is None: + continue + sigma = track.noise_by_view.get(camera_name, track.noise_scale) + projected = project_pose(camera, seeded_state.pose3d) + best_cost = np.inf + best_detection: PoseDetection | None = None + support_indices = tuple( + index + for index in proposal.support_detection_indices.get(camera_name, ()) + if 0 <= index < len(frame.detections) + ) + candidate_indices = support_indices + tuple( + index for index in range(len(frame.detections)) if index not in support_indices + ) + for detection_index in candidate_indices: + if detection_index < 0 or detection_index >= len(frame.detections): + continue + detection = frame.detections[detection_index] + match_cost, accepted_core_joints = _core_match_metrics( + projected, + detection, + sigma, + gate_px=self._config.active_core_gate_px, + ) + if accepted_core_joints < self._config.lost_min_accepted_core_joints: + continue + if match_cost < best_cost: + best_cost = match_cost + best_detection = detection + if best_detection is not None: + matched[camera_name] = best_detection + return matched + + def _mark_proposal_compatible_lost_track( + self, + track: ActiveTrackState, + proposal: ProposalCluster, + update_events: list[TrackUpdateEvent], + *, + accepted_view_count: int = 0, + accepted_joint_count: int = 0, + mean_reprojection_error: float | None = None, + ) -> None: + track.score += self._config.proposal_compatible_score_relief + if mean_reprojection_error is not None and np.isfinite(mean_reprojection_error): + track.mean_reprojection_error = mean_reprojection_error + self._diagnostics.proposal_compatible_lost_frames += 1 + update_events.append( + TrackUpdateEvent( + track_id=track.track_id, + action="proposal_compatible", + proposal_view_count=proposal.view_count, + proposal_support_size=proposal.support_size, + accepted_view_count=accepted_view_count, + accepted_joint_count=accepted_joint_count, + mean_reprojection_error=( + proposal.mean_reprojection_error + if mean_reprojection_error is None + else mean_reprojection_error + ), + ) + ) + + def _update_active_tracks( + self, + bundle: FrameBundle, + predicted_states: dict[int, _PredictedTrackState], + matches: dict[int, dict[str, PoseDetection]], + update_events: list[TrackUpdateEvent], + ) -> None: demote_to_lost: list[int] = [] for track_id, track in list(self._active.items()): matched = matches.get(track_id, {}) + predicted = predicted_states[track_id] if len(matched) >= self._config.active_min_views: - updated_skeleton, updated_noise = self._refine_track_state(track, matched) - track.skeleton = updated_skeleton - track.noise_scale = updated_noise - track.score = 0.85 * track.score + len(matched) - track.misses = 0 - track.last_bundle_index = bundle.bundle_index + update_result, updated_noise, updated_noise_by_view = self._refine_track_state( + track, + predicted.skeleton, + matched, + ) + if ( + update_result.accepted_view_count >= self._config.active_min_views + and update_result.accepted_joint_count >= self._config.active_min_accepted_joints + ): + self._apply_successful_update( + track, + predicted, + update_result, + updated_noise, + updated_noise_by_view, + bundle_index=bundle.bundle_index, + update_kind="direct_update", + ) + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="direct_update", + accepted_view_count=update_result.accepted_view_count, + accepted_joint_count=update_result.accepted_joint_count, + mean_reprojection_error=update_result.mean_reprojection_error, + ) + ) + continue + self._predict_only_update(track, predicted, bundle_index=bundle.bundle_index, lost_track=False) + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="predict_only", + mean_reprojection_error=update_result.mean_reprojection_error, + ) + ) else: - track.misses += 1 - track.score -= 1.0 - if track.misses >= self._config.active_miss_to_lost: + self._predict_only_update(track, predicted, bundle_index=bundle.bundle_index, lost_track=False) + update_events.append(TrackUpdateEvent(track_id=track_id, action="predict_only")) + if ( + track.misses >= self._config.active_miss_to_lost + or track.score <= self._config.active_score_lost_threshold + ): demote_to_lost.append(track_id) for track_id in demote_to_lost: @@ -260,23 +692,59 @@ class PoseTracker: track.lost_age = 0 self._lost[track_id] = track - def _update_lost_tracks(self, bundle: FrameBundle, matches: dict[int, dict[str, PoseDetection]]) -> None: + def _update_lost_tracks( + self, + bundle: FrameBundle, + predicted_states: dict[int, _PredictedTrackState], + matches: dict[int, dict[str, PoseDetection]], + update_events: list[TrackUpdateEvent], + ) -> None: recover_to_active: list[int] = [] for track_id, track in list(self._lost.items()): matched = matches.get(track_id, {}) - if len(matched) >= self._config.active_min_views: - updated_skeleton, updated_noise = self._refine_track_state(track, matched) - track.skeleton = updated_skeleton - track.noise_scale = updated_noise - track.status = "active" - track.misses = 0 - track.lost_age = 0 - track.score = max(track.score, 1.0) + len(matched) - track.last_bundle_index = bundle.bundle_index - recover_to_active.append(track_id) + predicted = predicted_states[track_id] + if len(matched) >= self._config.lost_min_views: + update_result, updated_noise, updated_noise_by_view = self._refine_track_state( + track, + predicted.skeleton, + matched, + ) + if ( + update_result.accepted_view_count >= self._config.lost_min_views + and update_result.accepted_joint_count >= self._config.lost_min_accepted_joints + ): + self._apply_successful_update( + track, + predicted, + update_result, + updated_noise, + updated_noise_by_view, + bundle_index=bundle.bundle_index, + update_kind="direct_reacquire", + ) + recover_to_active.append(track_id) + self._diagnostics.direct_reacquisition_updates += 1 + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="direct_reacquire", + accepted_view_count=update_result.accepted_view_count, + accepted_joint_count=update_result.accepted_joint_count, + mean_reprojection_error=update_result.mean_reprojection_error, + ) + ) + continue + self._predict_only_update(track, predicted, bundle_index=bundle.bundle_index, lost_track=True) + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="predict_only", + mean_reprojection_error=update_result.mean_reprojection_error, + ) + ) else: - track.lost_age += 1 - track.score -= 0.5 + self._predict_only_update(track, predicted, bundle_index=bundle.bundle_index, lost_track=True) + update_events.append(TrackUpdateEvent(track_id=track_id, action="predict_only")) for track_id in recover_to_active: self._diagnostics.reacquisitions += 1 self._active[track_id] = self._lost.pop(track_id) @@ -288,128 +756,157 @@ class PoseTracker: if int(np.sum(person_counts)) == 0: return () trace = __import__("rpt").triangulate_debug(poses_2d, person_counts, self._rpt_config) - return extract_clusters(trace, tuple(frame.camera_name for frame in bundle.views)) + return extract_clusters(trace, bundle.views, self._scene.cameras, unmatched_indices) finally: self._diagnostics.proposal_build_calls += 1 self._diagnostics.proposal_build_seconds += perf_counter() - started_at - def _birth_candidate_proposals(self, proposals: tuple[ProposalCluster, ...]) -> tuple[ProposalCluster, ...]: - if not self._single_track_cap_enabled(): - return proposals - if self._active or self._lost: - return () - if not proposals: - return () - return (max(proposals, key=_proposal_rank),) + def _tentative_assignment_cost(self, track: TentativeTrackState, proposal: ProposalCluster) -> float: + root_distance = float(np.linalg.norm(track.root - proposal.root)) + if root_distance > self._config.proposal_match_distance_m: + return np.inf + core_distance = _core_pose_distance(track.root_centered_pose3d, proposal.root_centered_pose3d) + if core_distance > self._config.proposal_core_match_distance_m: + return np.inf + return ( + root_distance / max(self._config.proposal_match_distance_m, 1e-6) + + core_distance / max(self._config.proposal_core_match_distance_m, 1e-6) + ) - def _refresh_capped_single_track_from_proposals( + def _update_tentative_summary(self, track: TentativeTrackState) -> None: + track.mean_view_count = float( + np.mean(np.asarray([proposal.view_count for proposal in track.evidence_buffer], dtype=np.float64)) + ) + track.mean_support_size = float( + np.mean(np.asarray([proposal.support_size for proposal in track.evidence_buffer], dtype=np.float64)) + ) + track.mean_reprojection_error = _mean_finite( + [proposal.mean_reprojection_error for proposal in track.evidence_buffer], + default=np.inf, + ) + if track.evidence_buffer: + track.last_source_views = track.evidence_buffer[-1].source_views + + def _update_tentative_tracks( self, bundle: FrameBundle, proposals: tuple[ProposalCluster, ...], - ) -> tuple[ProposalCluster, ...]: - if not self._single_track_cap_enabled() or not proposals: - return proposals - - remaining = list(proposals) - if self._active: - track = next(iter(self._active.values())) - predicted_root = track.skeleton.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], :3] - best_index = -1 - best_distance = self._config.proposal_match_distance_m - for proposal_index, proposal in enumerate(remaining): - distance = float(np.linalg.norm(predicted_root - proposal.root)) - if distance <= best_distance: - best_distance = distance - best_index = proposal_index - if best_index >= 0: - proposal = remaining.pop(best_index) - track.skeleton = self._seed_skeleton(proposal.pose3d, beta=track.skeleton.beta) - track.score = 0.85 * track.score + proposal.mean_score * proposal.support_size - track.misses = 0 - track.last_bundle_index = bundle.bundle_index - self._diagnostics.active_updates += 1 - return tuple(remaining) - - if self._lost: - track_id, track = next(iter(self._lost.items())) - predicted_root = track.skeleton.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], :3] - best_index = -1 - best_distance = self._config.proposal_match_distance_m - for proposal_index, proposal in enumerate(remaining): - distance = float(np.linalg.norm(predicted_root - proposal.root)) - if distance <= best_distance: - best_distance = distance - best_index = proposal_index - if best_index >= 0: - proposal = remaining.pop(best_index) - track = self._lost.pop(track_id) - track.skeleton = self._seed_skeleton(proposal.pose3d, beta=track.skeleton.beta) - track.status = "active" - track.misses = 0 - track.lost_age = 0 - track.score = max(track.score, 1.0) + proposal.mean_score * proposal.support_size - track.last_bundle_index = bundle.bundle_index - self._active[track_id] = track - self._diagnostics.active_updates += 1 - self._diagnostics.reacquisitions += 1 - return tuple(remaining) - - return proposals - - def _update_tentative_tracks(self, bundle: FrameBundle, proposals: tuple[ProposalCluster, ...]) -> None: + update_events: list[TrackUpdateEvent], + ) -> None: + updated_ids: set[int] = set() for track in self._tentative.values(): track.age += 1 track.misses += 1 - for proposal in proposals: + for proposal in sorted(proposals, key=_proposal_rank, reverse=True): best_track_id = -1 - best_distance = self._config.proposal_match_distance_m + best_cost = np.inf for track_id, track in self._tentative.items(): - distance = float(np.linalg.norm(track.root - proposal.root)) - if distance < best_distance: - best_distance = distance + cost = self._tentative_assignment_cost(track, proposal) + if cost < best_cost: + best_cost = cost best_track_id = track_id if best_track_id < 0: + capacity = self._capacity_for_new_track() + if capacity is not None and capacity <= 0: + continue track_id = self._next_track_id self._next_track_id += 1 - self._tentative[track_id] = TentativeTrackState( + score = proposal.mean_score * max(1, proposal.view_count) - ( + _tentative_reprojection_penalty(proposal) + ) + tentative = TentativeTrackState( track_id=track_id, age=1, misses=0, - score=proposal.mean_score * proposal.support_size, + hit_count=1, + score=score, last_bundle_index=bundle.bundle_index, root=proposal.root.copy(), pose3d=_copy_pose3d(proposal.pose3d), - evidence_buffer=[_copy_pose3d(proposal.pose3d)], + root_centered_pose3d=_copy_pose3d(proposal.root_centered_pose3d), + evidence_buffer=[proposal], + mean_view_count=float(proposal.view_count), + mean_reprojection_error=proposal.mean_reprojection_error, + mean_support_size=float(proposal.support_size), + last_source_views=proposal.source_views, + ) + self._tentative[track_id] = tentative + updated_ids.add(track_id) + self._diagnostics.tentative_updates += 1 + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="tentative_observed", + proposal_view_count=proposal.view_count, + proposal_support_size=proposal.support_size, + ) ) continue track = self._tentative[best_track_id] track.root = proposal.root.copy() track.pose3d = _copy_pose3d(proposal.pose3d) - track.score = 0.8 * track.score + proposal.mean_score * proposal.support_size + track.root_centered_pose3d = _copy_pose3d(proposal.root_centered_pose3d) + track.hit_count += 1 + track.score = ( + 0.75 * track.score + + proposal.mean_score * max(1, proposal.view_count) + - _tentative_reprojection_penalty(proposal) + ) track.misses = 0 track.last_bundle_index = bundle.bundle_index - track.evidence_buffer.append(_copy_pose3d(proposal.pose3d)) + track.evidence_buffer.append(proposal) if len(track.evidence_buffer) > self._config.tentative_buffer_size: track.evidence_buffer.pop(0) + self._update_tentative_summary(track) + updated_ids.add(best_track_id) + self._diagnostics.tentative_updates += 1 + update_events.append( + TrackUpdateEvent( + track_id=best_track_id, + action="tentative_observed", + proposal_view_count=proposal.view_count, + proposal_support_size=proposal.support_size, + ) + ) - def _promote_tentative_tracks(self, bundle: FrameBundle) -> None: - promote_ids: list[int] = [] for track_id, track in self._tentative.items(): + if track_id not in updated_ids: + update_events.append(TrackUpdateEvent(track_id=track_id, action="tentative_missed")) + + def _promote_tentative_tracks(self, bundle: FrameBundle, update_events: list[TrackUpdateEvent]) -> None: + promote_candidates: list[TentativeTrackState] = [] + for track in self._tentative.values(): if track.age < self._config.tentative_min_age: continue if len(track.evidence_buffer) < self._config.tentative_hits_required: continue + if track.mean_view_count < self._config.tentative_min_mean_views: + continue + if track.mean_reprojection_error > self._config.tentative_max_reprojection_error_px: + continue + if _tentative_bone_cv(track) > self._config.tentative_max_bone_cv: + continue if track.score < self._config.tentative_promote_score: continue if _tentative_root_spread(track) > self._config.proposal_match_distance_m: continue - promote_ids.append(track_id) - - for track_id in promote_ids: - tentative = self._tentative.pop(track_id) - mean_pose = np.mean(np.stack(tentative.evidence_buffer, axis=0), axis=0) + promote_candidates.append(track) + promote_candidates.sort(key=_tentative_track_rank, reverse=True) + for tentative in promote_candidates: + track_id = tentative.track_id + if track_id not in self._tentative: + continue + pose_buffer = [_copy_pose3d(proposal.pose3d) for proposal in tentative.evidence_buffer] + beta = estimate_beta_from_pose_buffer(pose_buffer) + mean_pose = np.mean(np.stack(pose_buffer, axis=0), axis=0) + initial_noise = ( + max(3.0, min(45.0, tentative.mean_reprojection_error)) + if np.isfinite(tentative.mean_reprojection_error) + else 9.0 + ) + self._tentative.pop(track_id, None) self._active[track_id] = ActiveTrackState( track_id=track_id, status="active", @@ -417,48 +914,141 @@ class PoseTracker: lost_age=0, score=max(tentative.score, 1.0), last_bundle_index=bundle.bundle_index, - skeleton=self._seed_skeleton(mean_pose), - noise_scale=np.full((20,), 9.0, dtype=np.float64), + skeleton=self._seed_skeleton(mean_pose, beta=beta), + noise_scale=np.full((20,), initial_noise, dtype=np.float64), + root_velocity=np.zeros(3, dtype=np.float64), + joint_velocity=np.zeros(31, dtype=np.float64), + beta_frozen=self._config.beta_grace_frames <= 0, + beta_grace_age=0, + covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64) * (initial_noise**2), + outside_volume_age=0, + mean_reprojection_error=tentative.mean_reprojection_error, + last_update_kind="promoted", ) self._diagnostics.promotions += 1 + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="promoted", + proposal_view_count=int(round(tentative.mean_view_count)), + proposal_support_size=int(round(tentative.mean_support_size)), + ) + ) - def _reacquire_lost_tracks(self, bundle: FrameBundle, proposals: tuple[ProposalCluster, ...]) -> None: + def _proposal_reacquisition_cost(self, predicted_state: SkeletonState, proposal: ProposalCluster) -> float: + predicted_root = predicted_state.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], :3] + root_distance = float(np.linalg.norm(predicted_root - proposal.root)) + if root_distance > self._config.proposal_reacquire_root_distance_m: + return np.inf + predicted_centered_pose = _copy_root_centered_pose3d(predicted_state.pose3d) + core_distance = _core_pose_distance(predicted_centered_pose, proposal.root_centered_pose3d) + if core_distance > self._config.proposal_reacquire_core_distance_m: + return np.inf + return ( + root_distance / max(self._config.proposal_reacquire_root_distance_m, 1e-6) + + core_distance / max(self._config.proposal_reacquire_core_distance_m, 1e-6) + - 0.05 * proposal.view_count + ) + + def _reacquire_lost_tracks( + self, + bundle: FrameBundle, + predicted_states: dict[int, _PredictedTrackState], + proposals: tuple[ProposalCluster, ...], + update_events: list[TrackUpdateEvent], + ) -> None: used_indices: set[int] = set() - reactivate: list[tuple[int, int]] = [] + reactivate: list[tuple[int, int, MultiviewUpdateResult, np.ndarray, dict[str, np.ndarray]]] = [] for track_id, track in self._lost.items(): - predicted_root = track.skeleton.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], :3] + predicted = predicted_states[track_id] best_index = -1 - best_distance = self._config.proposal_match_distance_m + best_cost = np.inf for proposal_index, proposal in enumerate(proposals): if proposal_index in used_indices: continue - distance = float(np.linalg.norm(predicted_root - proposal.root)) - if distance <= best_distance: - best_distance = distance + cost = self._proposal_reacquisition_cost(predicted.skeleton, proposal) + if cost < best_cost: + best_cost = cost best_index = proposal_index - if best_index >= 0: + if best_index < 0: + continue + proposal = proposals[best_index] + seeded_state = self._seed_skeleton(proposal.pose3d, beta=track.skeleton.beta) + proposal_matches = self._proposal_support_matches(bundle, track, proposal, seeded_state) + self._diagnostics.proposal_reacquisition_attempts += 1 + if len(proposal_matches) < self._config.lost_min_views: + self._mark_proposal_compatible_lost_track( + track, + proposal, + update_events, + mean_reprojection_error=proposal.mean_reprojection_error, + ) + continue + update_result, updated_noise, updated_noise_by_view = self._refine_track_state( + track, + seeded_state, + proposal_matches, + ) + if ( + update_result.accepted_view_count >= self._config.lost_min_views + and update_result.accepted_joint_count >= self._config.lost_min_accepted_joints + ): used_indices.add(best_index) - reactivate.append((track_id, best_index)) - for track_id, proposal_index in reactivate: + reactivate.append((track_id, best_index, update_result, updated_noise, updated_noise_by_view)) + continue + self._mark_proposal_compatible_lost_track( + track, + proposal, + update_events, + accepted_view_count=update_result.accepted_view_count, + accepted_joint_count=update_result.accepted_joint_count, + mean_reprojection_error=update_result.mean_reprojection_error, + ) + for track_id, proposal_index, update_result, updated_noise, updated_noise_by_view in reactivate: if track_id not in self._lost: continue - track = self._lost.pop(track_id) + track = self._lost[track_id] proposal = proposals[proposal_index] - track.skeleton = self._seed_skeleton(proposal.pose3d, beta=track.skeleton.beta) - track.status = "active" - track.misses = 0 - track.lost_age = 0 - track.score = max(track.score, 1.0) - track.last_bundle_index = bundle.bundle_index + predicted = predicted_states[track_id] + self._apply_successful_update( + track, + predicted, + update_result, + updated_noise, + updated_noise_by_view, + bundle_index=bundle.bundle_index, + update_kind="proposal_reacquire", + ) self._active[track_id] = track + self._lost.pop(track_id, None) self._diagnostics.reacquisitions += 1 + self._diagnostics.proposal_reacquisition_updates += 1 + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="proposal_reacquire", + accepted_view_count=update_result.accepted_view_count, + accepted_joint_count=update_result.accepted_joint_count, + proposal_view_count=proposal.view_count, + proposal_support_size=proposal.support_size, + mean_reprojection_error=update_result.mean_reprojection_error, + ) + ) - def _delete_expired_tracks(self) -> None: + def _delete_expired_tracks(self, update_events: list[TrackUpdateEvent]) -> None: tentative_delete = [ track_id for track_id, track in self._tentative.items() if track.misses > self._config.tentative_max_misses ] for track_id in tentative_delete: self._tentative.pop(track_id, None) - lost_delete = [track_id for track_id, track in self._lost.items() if track.lost_age >= self._config.lost_delete_age] + update_events.append(TrackUpdateEvent(track_id=track_id, action="deleted_tentative")) + lost_delete = [ + track_id + for track_id, track in self._lost.items() + if track.lost_age >= self._config.lost_delete_age + or _covariance_trace(track.covariance) >= self._config.lost_covariance_trace_max + or track.outside_volume_age >= self._config.lost_outside_volume_frames + ] for track_id in lost_delete: self._lost.pop(track_id, None) + update_events.append(TrackUpdateEvent(track_id=track_id, action="deleted_lost")) diff --git a/tests/support/actual_test.py b/tests/support/actual_test.py index acd82d0..9de5255 100644 --- a/tests/support/actual_test.py +++ b/tests/support/actual_test.py @@ -1,3 +1,5 @@ +from collections import Counter +from dataclasses import dataclass from pathlib import Path import click @@ -8,12 +10,92 @@ from beartype import beartype from loguru import logger from pose_tracking_exp.common.normalization import infer_bbox_from_keypoints, normalize_rtmpose_body20 -from pose_tracking_exp.schema import CameraCalibration, CameraFrame, FrameBundle, PoseDetection, SceneConfig, TrackerConfig +from pose_tracking_exp.schema import ( + CameraCalibration, + CameraFrame, + FrameBundle, + PoseDetection, + SceneConfig, + TrackerConfig, + TrackerDiagnostics, + TrackedFrameResult, +) from pose_tracking_exp.tracking import PoseTracker _NOMINAL_FRAME_PERIOD_NS = 33_333_333 +@dataclass(slots=True) +class ActualTestTrackingSummary: + bundle_count: int + active_frames: int + proposal_frames: int + max_active_tracks: int + max_lost_tracks: int + update_action_counts: dict[str, int] + mean_accepted_views: float + mean_accepted_joints: float + mean_reprojection_error: float + diagnostics: TrackerDiagnostics + + +def _finite_mean(values: list[float]) -> float: + finite = [value for value in values if np.isfinite(value)] + if not finite: + return np.inf + return float(np.mean(np.asarray(finite, dtype=np.float64))) + + +@beartype +def summarize_tracking_results( + results: list[TrackedFrameResult], + diagnostics: TrackerDiagnostics, +) -> ActualTestTrackingSummary: + update_events = [event for result in results for event in result.update_events] + action_counts = Counter(event.action for event in update_events) + accepted_view_samples = [float(event.accepted_view_count) for event in update_events if event.accepted_view_count > 0] + accepted_joint_samples = [float(event.accepted_joint_count) for event in update_events if event.accepted_joint_count > 0] + reprojection_samples = [float(event.mean_reprojection_error) for event in update_events] + return ActualTestTrackingSummary( + bundle_count=len(results), + active_frames=sum(1 for result in results if result.active_tracks), + proposal_frames=sum(1 for result in results if result.proposals), + max_active_tracks=max((len(result.active_tracks) for result in results), default=0), + max_lost_tracks=max((len(result.lost_tracks) for result in results), default=0), + update_action_counts=dict(action_counts), + mean_accepted_views=_finite_mean(accepted_view_samples), + mean_accepted_joints=_finite_mean(accepted_joint_samples), + mean_reprojection_error=_finite_mean(reprojection_samples), + diagnostics=diagnostics, + ) + + +@beartype +def format_frame_summary_lines(results: list[TrackedFrameResult]) -> tuple[str, ...]: + lines: list[str] = [] + for result in results: + action_counts = Counter(event.action for event in result.update_events) + finite_reprojection_errors = [ + float(event.mean_reprojection_error) + for event in result.update_events + if np.isfinite(event.mean_reprojection_error) + ] + lines.append( + "bundle={} proposals={} active_ids={} lost_ids={} tentative_ids={} actions={} mean_event_reproj={}".format( + result.bundle_index, + len(result.proposals), + [track.track_id for track in result.active_tracks], + [track.track_id for track in result.lost_tracks], + [track.track_id for track in result.tentative_tracks], + dict(action_counts), + "{:.2f}".format(float(np.mean(np.asarray(finite_reprojection_errors, dtype=np.float64)))) + if finite_reprojection_errors + else "nan", + ) + ) + return tuple(lines) + + @beartype def load_actual_test_scene(root: Path) -> SceneConfig: # ActualTest parquet comes from the ChArUco/OpenCV side, so `rvec` / `tvec` @@ -148,6 +230,7 @@ def load_actual_test_segment_bundles( @click.option("--max-frames", type=click.IntRange(min=1)) @click.option("--min-camera-rows", default=1, type=click.IntRange(min=1), show_default=True) @click.option("--max-active-tracks", default=1, type=click.IntRange(min=1), show_default=True) +@click.option("--verbose-frames/--no-verbose-frames", default=False, show_default=True) def main( root_path: Path, segment_name: str, @@ -156,6 +239,7 @@ def main( max_frames: int | None, min_camera_rows: int, max_active_tracks: int, + verbose_frames: bool, ) -> None: logger.remove() logger.add( @@ -174,12 +258,34 @@ def main( ) tracker = PoseTracker(scene, TrackerConfig(max_active_tracks=max_active_tracks)) results = tracker.run(bundles) + summary = summarize_tracking_results(results, tracker.diagnostics_snapshot()) logger.info( - "actual_test bundles={} active_frames={} proposal_frames={}", - len(results), - sum(1 for result in results if result.active_tracks), - sum(1 for result in results if result.proposals), + "actual_test bundles={} active_frames={} proposal_frames={} max_active_tracks={} max_lost_tracks={} " + "mean_accepted_views={} mean_accepted_joints={} mean_reprojection_error={}", + summary.bundle_count, + summary.active_frames, + summary.proposal_frames, + summary.max_active_tracks, + summary.max_lost_tracks, + "{:.2f}".format(summary.mean_accepted_views) if np.isfinite(summary.mean_accepted_views) else "nan", + "{:.2f}".format(summary.mean_accepted_joints) if np.isfinite(summary.mean_accepted_joints) else "nan", + "{:.2f}".format(summary.mean_reprojection_error) if np.isfinite(summary.mean_reprojection_error) else "nan", ) + logger.info( + "actual_test actions={} promotions={} reacquisitions={} predict_only_updates={} proposal_reacquisition_attempts={} " + "proposal_compatible_lost_frames={} nonlinear_refinements={} lm_iterations={}", + summary.update_action_counts, + summary.diagnostics.promotions, + summary.diagnostics.reacquisitions, + summary.diagnostics.predict_only_updates, + summary.diagnostics.proposal_reacquisition_attempts, + summary.diagnostics.proposal_compatible_lost_frames, + summary.diagnostics.nonlinear_refinements, + summary.diagnostics.lm_iterations, + ) + if verbose_frames: + for line in format_frame_summary_lines(results): + logger.info("actual_test_frame {}", line) if __name__ == "__main__": diff --git a/tests/test_actual_test_parquet.py b/tests/test_actual_test_parquet.py index fe276c6..c17fadb 100644 --- a/tests/test_actual_test_parquet.py +++ b/tests/test_actual_test_parquet.py @@ -5,7 +5,13 @@ import pyarrow as pa import pyarrow.parquet as pq from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME -from tests.support.actual_test import load_actual_test_scene, load_actual_test_segment_bundles +from pose_tracking_exp.schema import TrackUpdateEvent, TrackerDiagnostics, TrackedFrameResult +from tests.support.actual_test import ( + format_frame_summary_lines, + load_actual_test_scene, + load_actual_test_segment_bundles, + summarize_tracking_results, +) def _write_parquet(path: Path, rows: list[dict[str, object]]) -> None: @@ -125,3 +131,58 @@ def test_load_actual_test_keeps_partial_camera_frames(tmp_path: Path) -> None: assert [view.camera_name for view in bundles[1].views] == ["5602", "5603"] assert len(bundles[1].views[0].detections) == 1 assert bundles[1].views[1].detections == () + + +def test_actual_test_summary_reports_event_counts() -> None: + results = [ + TrackedFrameResult( + bundle_index=0, + timestamp_unix_ns=0, + tentative_tracks=(), + active_tracks=(), + lost_tracks=(), + proposals=(), + update_events=( + TrackUpdateEvent( + track_id=1, + action="direct_update", + accepted_view_count=2, + accepted_joint_count=14, + mean_reprojection_error=6.0, + ), + ), + ), + TrackedFrameResult( + bundle_index=1, + timestamp_unix_ns=1, + tentative_tracks=(), + active_tracks=(), + lost_tracks=(), + proposals=(), + update_events=( + TrackUpdateEvent(track_id=1, action="predict_only"), + TrackUpdateEvent( + track_id=1, + action="proposal_compatible", + proposal_view_count=2, + proposal_support_size=3, + mean_reprojection_error=12.0, + ), + ), + ), + ] + + summary = summarize_tracking_results( + results, + TrackerDiagnostics(promotions=1, proposal_compatible_lost_frames=1), + ) + lines = format_frame_summary_lines(results) + + assert summary.bundle_count == 2 + assert summary.update_action_counts["direct_update"] == 1 + assert summary.update_action_counts["proposal_compatible"] == 1 + assert summary.mean_accepted_views == 2.0 + assert summary.mean_accepted_joints == 14.0 + assert summary.mean_reprojection_error == 9.0 + assert len(lines) == 2 + assert "proposal_compatible" in lines[1] diff --git a/tests/test_camera_conventions.py b/tests/test_camera_conventions.py index 7bd270f..0820bbf 100644 --- a/tests/test_camera_conventions.py +++ b/tests/test_camera_conventions.py @@ -105,6 +105,32 @@ def test_load_scene_file_supports_explicit_rpt_pose(tmp_path: Path) -> None: np.testing.assert_allclose(scene.cameras[0].T, [-1.0, -2.0, -3.0]) +def test_load_scene_file_defaults_imgpaths_payloads_to_rpt_pose(tmp_path: Path) -> None: + scene_path = tmp_path / "scene.json" + payload = { + "imgpaths": ["/tmp/cam0.jpg"], + "room_size": [6.0, 4.0, 3.0], + "room_center": [0.0, 0.0, 1.0], + "cameras": [ + { + "name": "cam0", + "width": 640, + "height": 480, + "K": [[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]], + "DC": [0.0, 0.0, 0.0, 0.0, 0.0], + "R": [[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]], + "T": [[1.0], [2.0], [3.0]], + } + ], + } + scene_path.write_text(json.dumps(payload), encoding="utf-8") + + scene = load_scene_file(scene_path) + + np.testing.assert_allclose(scene.cameras[0].pose_T, [1.0, 2.0, 3.0]) + np.testing.assert_allclose(scene.cameras[0].T, [-1.0, -2.0, -3.0]) + + def test_build_rpt_config_uses_pose_convention(monkeypatch: pytest.MonkeyPatch) -> None: args = _camera_args() camera = CameraCalibration.from_opencv_extrinsics( diff --git a/tests/test_tracker_single_person.py b/tests/test_tracker_single_person.py index 23a52ce..5889c90 100644 --- a/tests/test_tracker_single_person.py +++ b/tests/test_tracker_single_person.py @@ -1,13 +1,25 @@ from pathlib import Path +from types import SimpleNamespace import numpy as np import pytest pytest.importorskip("rpt") +from pose_tracking_exp.common.camera_math import project_pose from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME -from pose_tracking_exp.schema import CameraCalibration, CameraFrame, FrameBundle, ProposalCluster, SceneConfig, TrackerConfig -from pose_tracking_exp.tracking import PoseTracker +from pose_tracking_exp.schema import ( + ActiveTrackState, + CameraCalibration, + CameraFrame, + FrameBundle, + PoseDetection, + ProposalCluster, + SceneConfig, + TRACK_COVARIANCE_DIMENSION, + TrackerConfig, +) +from pose_tracking_exp.tracking import PoseTracker, seed_state_from_pose3d def _make_scene() -> SceneConfig: @@ -89,6 +101,26 @@ def _make_proposal(root_x: float, *, score: float = 1.0) -> ProposalCluster: source_views=frozenset({"cam0", "cam1"}), support_size=2, mean_score=score, + support_detection_indices={"cam0": (0,), "cam1": (0,)}, + ) + + +def _fake_detection() -> PoseDetection: + return PoseDetection( + bbox=np.asarray([0.0, 0.0, 1.0, 1.0], dtype=np.float64), + bbox_confidence=1.0, + keypoints=np.zeros((20, 3), dtype=np.float64), + ) + + +def _detection_from_projection(projected: np.ndarray, *, confidence: float = 1.0) -> PoseDetection: + keypoints = np.zeros((20, 3), dtype=np.float64) + keypoints[:, :2] = projected[:, :2] + keypoints[:, 2] = confidence + return PoseDetection( + bbox=np.asarray([0.0, 0.0, 1.0, 1.0], dtype=np.float64), + bbox_confidence=confidence, + keypoints=keypoints, ) @@ -147,6 +179,32 @@ def test_single_person_mode_reuses_lost_track_id(monkeypatch) -> None: "_build_proposals", lambda bundle, unmatched: proposals_by_bundle[bundle.bundle_index], ) + fake_detection = _fake_detection() + monkeypatch.setattr( + tracker, + "_proposal_support_matches", + lambda bundle, track, proposal, seeded_state: {"cam0": fake_detection, "cam1": fake_detection}, + ) + update_result = SimpleNamespace( + state=seed_state_from_pose3d(_make_proposal(0.05, score=0.96).pose3d), + parameter_covariance=np.eye(31, dtype=np.float64) * 0.1, + beta_covariance=np.eye(8, dtype=np.float64) * 0.01, + accepted_joint_masks={"cam0": np.ones((20,), dtype=bool), "cam1": np.ones((20,), dtype=bool)}, + accepted_joint_counts_by_view={"cam0": 20, "cam1": 20}, + accepted_joint_count=20, + accepted_view_count=2, + mean_reprojection_error=5.0, + lm_iterations=2, + ) + monkeypatch.setattr( + tracker, + "_refine_track_state", + lambda track, predicted_state, matched: ( + update_result, + np.full((20,), 9.0, dtype=np.float64), + {"cam0": np.full((20,), 9.0, dtype=np.float64), "cam1": np.full((20,), 9.0, dtype=np.float64)}, + ), + ) results = tracker.run([_make_bundle(0), _make_bundle(1), _make_bundle(2)]) @@ -154,3 +212,256 @@ def test_single_person_mode_reuses_lost_track_id(monkeypatch) -> None: assert [track.track_id for track in results[1].lost_tracks] == [1] assert [track.track_id for track in results[2].active_tracks] == [1] assert tracker.diagnostics_snapshot().reacquisitions >= 1 + + +def test_active_track_is_not_reseeded_from_proposals(monkeypatch) -> None: + tracker = PoseTracker( + _make_scene(), + TrackerConfig( + max_active_tracks=1, + tentative_min_age=1, + tentative_hits_required=1, + tentative_promote_score=0.0, + active_miss_to_lost=3, + proposal_min_score=0.5, + ), + ) + proposals_by_bundle = { + 0: (_make_proposal(0.0, score=0.95),), + 1: (_make_proposal(0.8, score=0.99),), + } + + monkeypatch.setattr( + tracker, + "_build_proposals", + lambda bundle, unmatched: proposals_by_bundle[bundle.bundle_index], + ) + + results = tracker.run([_make_bundle(0), _make_bundle(1)]) + + assert [track.track_id for track in results[1].active_tracks] == [1] + active_track = results[1].active_tracks[0] + assert active_track.last_update_kind == "predict_only" + assert abs(float(active_track.skeleton.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], 0])) < 0.2 + assert not any(event.action == "proposal_reacquire" for event in results[1].update_events) + + +def test_lost_track_deleted_by_covariance_trace() -> None: + tracker = PoseTracker(_make_scene(), TrackerConfig(max_active_tracks=1, lost_covariance_trace_max=10.0)) + proposal = _make_proposal(0.0, score=0.95) + tracker._lost[1] = ActiveTrackState( + track_id=1, + status="lost", + lost_age=1, + skeleton=seed_state_from_pose3d(proposal.pose3d), + covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64) * 1_000.0, + ) + + result = tracker.step(_make_bundle(0)) + + assert not result.lost_tracks + assert any(event.action == "deleted_lost" for event in result.update_events) + + +def test_track_beta_freezes_after_grace_update(monkeypatch) -> None: + tracker = PoseTracker(_make_scene(), TrackerConfig(max_active_tracks=1, beta_grace_frames=1)) + proposal = _make_proposal(0.0, score=0.95) + skeleton = seed_state_from_pose3d(proposal.pose3d) + tracker._active[1] = ActiveTrackState(track_id=1, status="active", skeleton=skeleton, score=1.0) + fake_detection = PoseDetection( + bbox=np.asarray([0.0, 0.0, 1.0, 1.0], dtype=np.float64), + bbox_confidence=1.0, + keypoints=np.zeros((20, 3), dtype=np.float64), + ) + + monkeypatch.setattr( + tracker, + "_match_existing_tracks", + lambda bundle, predicted: ({1: {"cam0": fake_detection, "cam1": fake_detection}}, {"cam0": [], "cam1": []}), + ) + + updated_state = seed_state_from_pose3d(proposal.pose3d, beta=np.full((8,), 1.1, dtype=np.float64)) + update_result = SimpleNamespace( + state=updated_state, + parameter_covariance=np.eye(31, dtype=np.float64) * 0.1, + beta_covariance=np.eye(8, dtype=np.float64) * 0.01, + accepted_joint_masks={"cam0": np.ones((20,), dtype=bool), "cam1": np.ones((20,), dtype=bool)}, + accepted_joint_counts_by_view={"cam0": 20, "cam1": 20}, + accepted_joint_count=20, + accepted_view_count=2, + mean_reprojection_error=4.0, + lm_iterations=1, + ) + monkeypatch.setattr( + tracker, + "_refine_track_state", + lambda track, predicted_state, matched: ( + update_result, + np.full((20,), 9.0, dtype=np.float64), + {"cam0": np.full((20,), 9.0, dtype=np.float64), "cam1": np.full((20,), 9.0, dtype=np.float64)}, + ), + ) + monkeypatch.setattr(tracker, "_build_proposals", lambda bundle, unmatched: ()) + + result = tracker.step(_make_bundle(0)) + + assert result.active_tracks[0].beta_frozen + np.testing.assert_allclose(result.active_tracks[0].skeleton.beta, np.full((8,), 1.1, dtype=np.float64)) + + +def test_active_track_demotes_to_lost_on_score_floor() -> None: + tracker = PoseTracker( + _make_scene(), + TrackerConfig(max_active_tracks=1, active_miss_to_lost=10, active_score_lost_threshold=0.0), + ) + proposal = _make_proposal(0.0, score=0.95) + tracker._active[1] = ActiveTrackState( + track_id=1, + status="active", + score=0.1, + skeleton=seed_state_from_pose3d(proposal.pose3d), + covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64), + ) + + result = tracker.step(_make_bundle(0)) + + assert not result.active_tracks + assert [track.track_id for track in result.lost_tracks] == [1] + + +def test_proposal_compatible_lost_track_stays_lost_without_enough_support(monkeypatch) -> None: + tracker = PoseTracker( + _make_scene(), + TrackerConfig(max_active_tracks=1, active_miss_to_lost=1, lost_delete_age=10), + ) + proposal = _make_proposal(0.0, score=0.95) + tracker._lost[1] = ActiveTrackState( + track_id=1, + status="lost", + lost_age=1, + score=1.0, + skeleton=seed_state_from_pose3d(proposal.pose3d), + covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64), + ) + monkeypatch.setattr(tracker, "_build_proposals", lambda bundle, unmatched: (proposal,)) + monkeypatch.setattr(tracker, "_proposal_support_matches", lambda bundle, track, proposal, seeded_state: {"cam0": _fake_detection()}) + + result = tracker.step(_make_bundle(0)) + + assert not result.active_tracks + assert [track.track_id for track in result.lost_tracks] == [1] + assert any(event.action == "proposal_compatible" for event in result.update_events) + + +def test_proposal_support_matches_search_all_view_detections() -> None: + scene = _make_scene() + tracker = PoseTracker(_make_scene(), TrackerConfig(max_active_tracks=1, lost_min_accepted_core_joints=2)) + proposal = _make_proposal(0.0, score=0.95) + track = ActiveTrackState(track_id=1, status="lost", skeleton=seed_state_from_pose3d(proposal.pose3d)) + seeded_state = seed_state_from_pose3d(proposal.pose3d) + projected_cam0 = project_pose(scene.cameras[0], seeded_state.pose3d) + projected_cam1 = project_pose(scene.cameras[1], seeded_state.pose3d) + good_cam0 = _detection_from_projection(projected_cam0) + good_cam1 = _detection_from_projection(projected_cam1) + bad_detection = _fake_detection() + bundle = FrameBundle( + bundle_index=0, + timestamp_unix_ns=0, + views=( + CameraFrame( + camera_name="cam0", + frame_index=0, + timestamp_unix_ns=0, + detections=(bad_detection, good_cam0), + source_size=(640, 480), + ), + CameraFrame( + camera_name="cam1", + frame_index=0, + timestamp_unix_ns=0, + detections=(bad_detection, good_cam1), + source_size=(640, 480), + ), + ), + ) + + matched = tracker._proposal_support_matches(bundle, track, proposal, seeded_state) + + assert matched["cam0"] is good_cam0 + assert matched["cam1"] is good_cam1 + + +def test_covariance_grows_on_predict_only_and_shrinks_on_update(monkeypatch) -> None: + tracker = PoseTracker(_make_scene(), TrackerConfig(max_active_tracks=1, active_miss_to_lost=10)) + proposal = _make_proposal(0.0, score=0.95) + tracker._active[1] = ActiveTrackState( + track_id=1, + status="active", + score=1.0, + skeleton=seed_state_from_pose3d(proposal.pose3d), + covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64), + ) + no_detection_bundle = _make_bundle(0) + predict_only_result = tracker.step(no_detection_bundle) + predict_only_cov_trace = float(np.trace(predict_only_result.active_tracks[0].covariance)) + + fake_detection = _fake_detection() + monkeypatch.setattr( + tracker, + "_match_existing_tracks", + lambda bundle, predicted: ({1: {"cam0": fake_detection, "cam1": fake_detection}}, {"cam0": [], "cam1": []}), + ) + update_result = SimpleNamespace( + state=seed_state_from_pose3d(proposal.pose3d, beta=np.ones((8,), dtype=np.float64)), + parameter_covariance=np.eye(31, dtype=np.float64) * 0.01, + beta_covariance=np.eye(8, dtype=np.float64) * 0.001, + accepted_joint_masks={"cam0": np.ones((20,), dtype=bool), "cam1": np.ones((20,), dtype=bool)}, + accepted_joint_counts_by_view={"cam0": 20, "cam1": 20}, + accepted_joint_count=20, + accepted_view_count=2, + mean_reprojection_error=3.0, + lm_iterations=1, + ) + monkeypatch.setattr( + tracker, + "_refine_track_state", + lambda track, predicted_state, matched: ( + update_result, + np.full((20,), 9.0, dtype=np.float64), + {"cam0": np.full((20,), 9.0, dtype=np.float64), "cam1": np.full((20,), 9.0, dtype=np.float64)}, + ), + ) + update_result_frame = tracker.step(_make_bundle(1)) + updated_cov_trace = float(np.trace(update_result_frame.active_tracks[0].covariance)) + + assert predict_only_cov_trace > float(TRACK_COVARIANCE_DIMENSION) + assert updated_cov_trace < predict_only_cov_trace + + +def test_proposal_compatible_lost_track_gets_score_relief(monkeypatch) -> None: + tracker = PoseTracker( + _make_scene(), + TrackerConfig( + max_active_tracks=1, + active_miss_to_lost=1, + lost_delete_age=10, + lost_score_decay=1.0, + lost_score_miss_penalty=0.5, + proposal_compatible_score_relief=0.4, + ), + ) + proposal = _make_proposal(0.0, score=0.95) + tracker._lost[1] = ActiveTrackState( + track_id=1, + status="lost", + lost_age=1, + score=1.0, + skeleton=seed_state_from_pose3d(proposal.pose3d), + covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64), + ) + monkeypatch.setattr(tracker, "_build_proposals", lambda bundle, unmatched: (proposal,)) + monkeypatch.setattr(tracker, "_proposal_support_matches", lambda bundle, track, proposal, seeded_state: {}) + + result = tracker.step(_make_bundle(0)) + + assert result.lost_tracks[0].score > 0.4