From 0bfeec77e4bef2f514079fc4d6860e22c0f94191 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Fri, 27 Mar 2026 15:36:48 +0800 Subject: [PATCH] feat(tracking): add recursive lifecycle updates and quality diagnostics Implement the next tracker tranche around a recursive articulated state rather than per-frame ad hoc updates. Track state now propagates full pose/velocity/shape covariance, uses process noise during prediction, and drives active-to-lost transitions from both miss counts and recursive score thresholds. The multiview update path replaces the generic SciPy least_squares call with a bounded LM/GN loop that returns parameter and beta covariance blocks, accepted-joint counts, mean reprojection error, and iteration diagnostics. Lost-track handling is stricter and safer: proposal-based reacquisition now requires same-frame 2D support and articulated refinement before a track can return to active. Proposal clusters retain contributing detection indices, the tracker searches broadly within contributing views, and proposal-compatible lost frames are surfaced explicitly instead of silently reviving a track. Old scene JSONs with imgpaths now default to the RPT camera-pose convention so proposal reprojection gating works on the sample scenes. Add ActualTest support diagnostics that summarize event counts, accepted support, reprojection quality, and tracker diagnostics, plus focused regressions for camera conventions, score-driven demotion, covariance behavior, proposal-compatible lost handling, and broader proposal-backed matching. --- src/pose_tracking_exp/schema/__init__.py | 4 + src/pose_tracking_exp/schema/tracking.py | 122 +- src/pose_tracking_exp/tracking/kinematics.py | 425 ++++++- src/pose_tracking_exp/tracking/replay_io.py | 7 +- src/pose_tracking_exp/tracking/rpt_adapter.py | 52 +- src/pose_tracking_exp/tracking/tracker.py | 1010 +++++++++++++---- tests/support/actual_test.py | 116 +- tests/test_actual_test_parquet.py | 63 +- tests/test_camera_conventions.py | 26 + tests/test_tracker_single_person.py | 315 ++++- 10 files changed, 1883 insertions(+), 257 deletions(-) diff --git a/src/pose_tracking_exp/schema/__init__.py b/src/pose_tracking_exp/schema/__init__.py index 9aceee3..05dd040 100644 --- a/src/pose_tracking_exp/schema/__init__.py +++ b/src/pose_tracking_exp/schema/__init__.py @@ -18,7 +18,9 @@ from pose_tracking_exp.schema.tracking import ( ProposalCluster, SkeletonState, TentativeTrackState, + TRACK_COVARIANCE_DIMENSION, TrackState, + TrackUpdateEvent, TrackerConfig, TrackerDiagnostics, TrackedFrameResult, @@ -41,7 +43,9 @@ __all__ = [ "SceneConfig", "SkeletonState", "TentativeTrackState", + "TRACK_COVARIANCE_DIMENSION", "TrackState", + "TrackUpdateEvent", "TrackerConfig", "TrackerDiagnostics", "TrackedFrameResult", diff --git a/src/pose_tracking_exp/schema/tracking.py b/src/pose_tracking_exp/schema/tracking.py index 91d1f31..641b352 100644 --- a/src/pose_tracking_exp/schema/tracking.py +++ b/src/pose_tracking_exp/schema/tracking.py @@ -5,6 +5,8 @@ import numpy as np from pose_tracking_exp.common.tensor_types import Pose3D, Vector3 +TRACK_COVARIANCE_DIMENSION = 70 # 31 pose parameters + 31 velocities + 8 shape parameters. + @dataclass(slots=True) class ProposalCluster: @@ -13,6 +15,29 @@ class ProposalCluster: source_views: frozenset[str] support_size: int mean_score: float + root_centered_pose3d: Pose3D = field(default_factory=lambda: np.zeros((20, 4), dtype=np.float64)) + view_count: int = 0 + pair_count: int = 0 + mean_reprojection_error: float = 0.0 + support_detection_indices: dict[str, tuple[int, ...]] = field(default_factory=dict) + + def __post_init__(self) -> None: + self.pose3d = np.asarray(self.pose3d, dtype=np.float64).reshape(20, 4) + self.root = np.asarray(self.root, dtype=np.float64).reshape(3) + root_centered = np.asarray(self.root_centered_pose3d, dtype=np.float64) + if root_centered.shape != (20, 4) or not np.any(root_centered[:, 3] > 0.0): + root_centered = self.pose3d.copy() + root_centered[:, :3] -= self.root[None, :] + self.root_centered_pose3d = root_centered + self.source_views = frozenset(self.source_views) + self.support_detection_indices = { + str(camera_name): tuple(int(index) for index in indices) + for camera_name, indices in self.support_detection_indices.items() + } + if self.view_count <= 0: + self.view_count = max(len(self.source_views), min(2, self.support_size)) + if self.pair_count <= 0: + self.pair_count = self.support_size @dataclass(slots=True) @@ -28,11 +53,17 @@ class TentativeTrackState: state: Literal["tentative"] = "tentative" age: int = 0 misses: int = 0 + hit_count: int = 0 score: float = 0.0 last_bundle_index: int = -1 root: Vector3 = field(default_factory=lambda: np.zeros(3, dtype=np.float64)) pose3d: Pose3D = field(default_factory=lambda: np.zeros((20, 4), dtype=np.float64)) - evidence_buffer: list[Pose3D] = field(default_factory=list) + root_centered_pose3d: Pose3D = field(default_factory=lambda: np.zeros((20, 4), dtype=np.float64)) + evidence_buffer: list[ProposalCluster] = field(default_factory=list) + mean_view_count: float = 0.0 + mean_reprojection_error: float = 0.0 + mean_support_size: float = 0.0 + last_source_views: frozenset[str] = field(default_factory=frozenset) @dataclass(slots=True) @@ -53,11 +84,52 @@ class ActiveTrackState: noise_scale: np.ndarray = field( default_factory=lambda: np.full((20,), 9.0, dtype=np.float64) ) + noise_by_view: dict[str, np.ndarray] = field(default_factory=dict) + root_velocity: np.ndarray = field(default_factory=lambda: np.zeros(3, dtype=np.float64)) + joint_velocity: np.ndarray = field(default_factory=lambda: np.zeros(31, dtype=np.float64)) + beta_frozen: bool = False + beta_grace_age: int = 0 + covariance: np.ndarray = field(default_factory=lambda: np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64)) + outside_volume_age: int = 0 + mean_reprojection_error: float = np.inf + last_update_kind: Literal[ + "initialized", + "direct_update", + "predict_only", + "direct_reacquire", + "proposal_reacquire", + "promoted", + ] = "initialized" TrackState = TentativeTrackState | ActiveTrackState +TrackUpdateAction = Literal[ + "tentative_observed", + "tentative_missed", + "promoted", + "direct_update", + "predict_only", + "direct_reacquire", + "proposal_reacquire", + "proposal_compatible", + "deleted_tentative", + "deleted_lost", +] + + +@dataclass(slots=True) +class TrackUpdateEvent: + track_id: int + action: TrackUpdateAction + accepted_view_count: int = 0 + accepted_joint_count: int = 0 + proposal_view_count: int = 0 + proposal_support_size: int = 0 + mean_reprojection_error: float = np.inf + + @dataclass(slots=True) class TrackedFrameResult: bundle_index: int @@ -66,6 +138,7 @@ class TrackedFrameResult: active_tracks: tuple[ActiveTrackState, ...] lost_tracks: tuple[ActiveTrackState, ...] proposals: tuple[ProposalCluster, ...] + update_events: tuple[TrackUpdateEvent, ...] = () @dataclass(slots=True) @@ -79,6 +152,13 @@ class TrackerDiagnostics: active_updates: int = 0 seed_initializations: int = 0 nonlinear_refinements: int = 0 + predict_only_updates: int = 0 + proposal_reacquisition_updates: int = 0 + direct_reacquisition_updates: int = 0 + tentative_updates: int = 0 + proposal_reacquisition_attempts: int = 0 + proposal_compatible_lost_frames: int = 0 + lm_iterations: int = 0 @dataclass(slots=True) @@ -89,14 +169,52 @@ class TrackerConfig: tentative_buffer_size: int = 5 tentative_min_age: int = 3 tentative_hits_required: int = 3 + tentative_min_mean_views: float = 2.0 + tentative_max_reprojection_error_px: float = 80.0 + tentative_max_bone_cv: float = 0.3 tentative_promote_score: float = 3.0 tentative_max_misses: int = 2 active_min_views: int = 2 + active_min_accepted_joints: int = 10 + lost_min_views: int = 1 + lost_min_accepted_joints: int = 8 + association_min_core_joints: int = 2 + lost_min_accepted_core_joints: int = 2 active_core_gate_px: float = 80.0 active_joint_gate_px: float = 120.0 - active_miss_to_lost: int = 3 + active_miss_to_lost: int = 5 lost_delete_age: int = 15 + lost_covariance_trace_max: float = 9_000.0 + lost_outside_volume_frames: int = 6 proposal_match_distance_m: float = 0.45 + proposal_core_match_distance_m: float = 0.7 + proposal_reacquire_root_distance_m: float = 0.75 + proposal_reacquire_core_distance_m: float = 1.05 + beta_grace_frames: int = 12 noise_ema: float = 0.85 + noise_min_px: float = 3.0 + noise_max_px: float = 45.0 + noise_residual_cap_px: float = 60.0 + process_noise_root_position_m: float = 0.03 + process_noise_root_rotation_rad: float = 0.02 + process_noise_joint_rad: float = 0.03 + process_noise_velocity: float = 0.04 + process_noise_beta: float = 0.01 + predict_only_process_scale: float = 1.75 + active_score_decay: float = 0.85 + active_score_view_gain: float = 1.2 + active_score_joint_gain: float = 1.0 + active_score_reprojection_penalty: float = 0.02 + active_score_miss_penalty: float = 0.75 + active_score_lost_threshold: float = -4.0 + lost_score_decay: float = 0.95 + lost_score_miss_penalty: float = 0.25 + lost_score_reacquire_gain: float = 0.75 + proposal_compatible_score_relief: float = 0.5 + lm_max_iterations: int = 5 + lm_damping: float = 0.02 + lm_step_epsilon: float = 1e-3 + lm_step_tolerance: float = 1e-4 + lm_student_t_dof: float = 4.0 proposal_min_score: float = 0.9 proposal_min_group_size: int = 1 diff --git a/src/pose_tracking_exp/tracking/kinematics.py b/src/pose_tracking_exp/tracking/kinematics.py index 0b61546..d0ccff2 100644 --- a/src/pose_tracking_exp/tracking/kinematics.py +++ b/src/pose_tracking_exp/tracking/kinematics.py @@ -1,16 +1,44 @@ import math +from collections.abc import Callable +from dataclasses import dataclass import numpy as np from beartype import beartype from scipy.optimize import least_squares from pose_tracking_exp.common.camera_math import project_pose -from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME +from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME, BODY20_OBSERVATION_COUNT from pose_tracking_exp.common.tensor_types import Pose3D from pose_tracking_exp.schema import CameraCalibration, PoseDetection, SkeletonState PARAMETER_DIMENSION = 31 SHAPE_DIMENSION = 8 +STABLE_LIMB_NAMES: tuple[tuple[str, str], ...] = ( + ("hip_middle", "shoulder_middle"), + ("shoulder_left", "shoulder_right"), + ("hip_left", "hip_right"), + ("shoulder_left", "elbow_left"), + ("shoulder_right", "elbow_right"), + ("elbow_left", "wrist_left"), + ("elbow_right", "wrist_right"), + ("hip_left", "knee_left"), + ("hip_right", "knee_right"), + ("knee_left", "ankle_left"), + ("knee_right", "ankle_right"), +) + + +@dataclass(slots=True) +class MultiviewUpdateResult: + state: SkeletonState + parameter_covariance: np.ndarray + beta_covariance: np.ndarray + accepted_joint_masks: dict[str, np.ndarray] + accepted_joint_counts_by_view: dict[str, int] + accepted_joint_count: int + accepted_view_count: int + mean_reprojection_error: float + lm_iterations: int def _rot_x(theta: float) -> np.ndarray: @@ -213,6 +241,64 @@ def _estimate_beta_from_pose(pose3d: Pose3D) -> np.ndarray: return np.clip(beta, 0.5, 2.0) +def _limb_length_samples(pose_buffer: list[Pose3D], name_a: str, name_b: str) -> list[float]: + index_a = BODY20_INDEX_BY_NAME[name_a] + index_b = BODY20_INDEX_BY_NAME[name_b] + samples: list[float] = [] + for pose3d in pose_buffer: + if pose3d[index_a, 3] <= 0.0 or pose3d[index_b, 3] <= 0.0: + continue + samples.append(float(np.linalg.norm(pose3d[index_a, :3] - pose3d[index_b, :3]))) + return samples + + +def estimate_beta_from_pose_buffer(pose_buffer: list[Pose3D]) -> np.ndarray: + if not pose_buffer: + return _default_shape() + + robust_pose = np.median(np.stack([np.asarray(pose3d, dtype=np.float64) for pose3d in pose_buffer], axis=0), axis=0) + beta = _estimate_beta_from_pose(np.asarray(robust_pose, dtype=np.float64)) + scale = float(beta[0]) if float(beta[0]) > 1e-8 else 1.0 + + limb_samples = { + limb_name: _limb_length_samples(pose_buffer, *joint_names) + for limb_name, joint_names in { + "torso": ("hip_middle", "shoulder_middle"), + "shoulder_width": ("shoulder_left", "shoulder_right"), + "pelvis_width": ("hip_left", "hip_right"), + "upper_arm": ("shoulder_left", "elbow_left"), + "lower_arm": ("elbow_left", "wrist_left"), + "upper_leg": ("hip_left", "knee_left"), + "lower_leg": ("knee_left", "ankle_left"), + }.items() + } + base_scales = { + "torso": 0.52, + "shoulder_width": 0.36, + "pelvis_width": 0.24, + "upper_arm": 0.30, + "lower_arm": 0.26, + "upper_leg": 0.45, + "lower_leg": 0.43, + } + beta_index = { + "torso": 1, + "shoulder_width": 2, + "pelvis_width": 3, + "upper_arm": 4, + "lower_arm": 5, + "upper_leg": 6, + "lower_leg": 7, + } + for limb_name, samples in limb_samples.items(): + if not samples: + continue + beta[beta_index[limb_name]] = float(np.median(np.asarray(samples, dtype=np.float64))) / ( + base_scales[limb_name] * scale + ) + return np.clip(beta, 0.5, 2.0) + + def _estimate_root_rotation(pose3d: Pose3D) -> np.ndarray: hip_left = pose3d[BODY20_INDEX_BY_NAME["hip_left"], :3] hip_right = pose3d[BODY20_INDEX_BY_NAME["hip_right"], :3] @@ -276,30 +362,205 @@ def initialize_state_from_pose3d(pose3d: Pose3D) -> SkeletonState: return refine_state_from_pose3d(pose3d) +def _parameter_bounds() -> tuple[np.ndarray, np.ndarray]: + lower = np.full((PARAMETER_DIMENSION,), -2.5, dtype=np.float64) + upper = np.full((PARAMETER_DIMENSION,), 2.5, dtype=np.float64) + lower[0:3] = -np.inf + upper[0:3] = np.inf + lower[3:6] = -math.pi + upper[3:6] = math.pi + return lower, upper + + +def _resolve_noise_by_view( + matched: dict[str, PoseDetection], + noise_scale: np.ndarray, + noise_by_view: dict[str, np.ndarray] | None, +) -> dict[str, np.ndarray]: + resolved: dict[str, np.ndarray] = {} + for camera_name in matched: + if noise_by_view is not None and camera_name in noise_by_view: + candidate = np.asarray(noise_by_view[camera_name], dtype=np.float64) + if candidate.shape == (BODY20_OBSERVATION_COUNT,): + resolved[camera_name] = candidate.copy() + continue + resolved[camera_name] = np.asarray(noise_scale, dtype=np.float64).copy() + return resolved + + +def _base_sigma_from_confidence(confidence: np.ndarray, min_px: float, max_px: float) -> np.ndarray: + clipped = np.clip(confidence, 0.05, 1.0) + return np.clip(max_px - (max_px - min_px) * clipped, min_px, max_px) + + +def _joint_acceptance_masks( + predicted_pose: Pose3D, + cameras: tuple[CameraCalibration, ...], + matched: dict[str, PoseDetection], + noise_by_view: dict[str, np.ndarray], + *, + joint_gate_px: float, +) -> dict[str, np.ndarray]: + camera_by_name = {camera.name: camera for camera in cameras} + accepted: dict[str, np.ndarray] = {} + for camera_name, detection in matched.items(): + projected = project_pose(camera_by_name[camera_name], predicted_pose) + confidence_mask = detection.keypoints[:, 2] > 0.05 + if not np.any(confidence_mask): + accepted[camera_name] = np.zeros((BODY20_OBSERVATION_COUNT,), dtype=bool) + continue + delta = projected[:, :2] - detection.keypoints[:, :2] + distance = np.sqrt(np.sum(delta * delta, axis=1)) + sigma = np.maximum(noise_by_view[camera_name], 1.0) + normalized = distance / sigma + accepted[camera_name] = confidence_mask & ( + (distance <= joint_gate_px) | (normalized <= 2.5) + ) + return accepted + + +@dataclass(slots=True) +class _ResidualEvaluation: + residual: np.ndarray + mean_reprojection_error: float + + +def _pack_candidate(parameters: np.ndarray, beta: np.ndarray, *, beta_frozen: bool) -> np.ndarray: + if beta_frozen: + return np.asarray(parameters, dtype=np.float64) + return np.concatenate([np.asarray(parameters, dtype=np.float64), np.asarray(beta, dtype=np.float64)], axis=0) + + +def _unpack_candidate(values: np.ndarray, predicted_beta: np.ndarray, *, beta_frozen: bool) -> tuple[np.ndarray, np.ndarray]: + parameters = np.asarray(values[:PARAMETER_DIMENSION], dtype=np.float64) + if beta_frozen: + return parameters, predicted_beta.copy() + beta = np.asarray(values[PARAMETER_DIMENSION:], dtype=np.float64) + return parameters, np.clip(beta, 0.5, 2.0) + + def _2d_update_residual( candidate: np.ndarray, predicted_parameters: np.ndarray, - beta: np.ndarray, - cameras: tuple[CameraCalibration, ...], matched: dict[str, PoseDetection], camera_by_name: dict[str, CameraCalibration], - noise_scale: np.ndarray, -) -> np.ndarray: - rendered = render_pose(candidate, beta) - residual_parts: list[np.ndarray] = [0.08 * (candidate - predicted_parameters)] + noise_by_view: dict[str, np.ndarray], + accepted_joint_masks: dict[str, np.ndarray], + *, + beta_frozen: bool, + predicted_beta: np.ndarray, + student_t_dof: float, +) -> _ResidualEvaluation: + parameters, resolved_beta = _unpack_candidate(candidate, predicted_beta, beta_frozen=beta_frozen) + rendered = render_pose(parameters, resolved_beta) + residual_parts: list[np.ndarray] = [0.08 * (parameters - predicted_parameters)] + if not beta_frozen: + residual_parts.append(0.2 * (resolved_beta - predicted_beta)) + reprojection_errors: list[np.ndarray] = [] for camera_name, detection in matched.items(): camera = camera_by_name[camera_name] projected = project_pose(camera, rendered) - joint_mask = detection.keypoints[:, 2] > 0.05 + joint_mask = accepted_joint_masks[camera_name] if not np.any(joint_mask): continue delta = projected[joint_mask, :2] - detection.keypoints[joint_mask, :2] - weights = np.sqrt(np.clip(detection.keypoints[joint_mask, 2], 0.05, 1.0)) - sigmas = noise_scale[joint_mask][:, None] - residual_parts.append((delta / sigmas) * weights[:, None]) + sigmas = np.maximum(noise_by_view[camera_name][joint_mask], 1.0)[:, None] + normalized = delta / sigmas + squared_distance = np.sum(normalized * normalized, axis=1) + confidence_weights = np.sqrt(np.clip(detection.keypoints[joint_mask, 2], 0.05, 1.0)) + student_t_weights = np.sqrt((student_t_dof + 2.0) / (student_t_dof + squared_distance)) + residual_parts.append(normalized * confidence_weights[:, None] * student_t_weights[:, None]) + reprojection_errors.append(np.sqrt(np.sum(delta * delta, axis=1))) if len(residual_parts) == 1: - return residual_parts[0] - return np.concatenate([part.reshape(-1) for part in residual_parts], axis=0) + flattened = residual_parts[0] + else: + flattened = np.concatenate([part.reshape(-1) for part in residual_parts], axis=0) + mean_reprojection_error = ( + float(np.mean(np.concatenate(reprojection_errors, axis=0))) + if reprojection_errors + else np.inf + ) + return _ResidualEvaluation( + residual=np.asarray(flattened, dtype=np.float64).reshape(-1), + mean_reprojection_error=mean_reprojection_error, + ) + + +def _finite_difference_jacobian( + values: np.ndarray, + evaluate: Callable[[np.ndarray], _ResidualEvaluation], + *, + step_epsilon: float, + lower_bounds: np.ndarray, + upper_bounds: np.ndarray, +) -> tuple[np.ndarray, _ResidualEvaluation]: + base = evaluate(values) + jacobian = np.zeros((base.residual.size, values.size), dtype=np.float64) + for column in range(values.size): + step = step_epsilon * max(1.0, abs(float(values[column]))) + candidate = values.copy() + candidate[column] = np.clip(candidate[column] + step, lower_bounds[column], upper_bounds[column]) + actual_step = candidate[column] - values[column] + if abs(float(actual_step)) <= 1e-12: + continue + shifted = evaluate(candidate) + jacobian[:, column] = (shifted.residual - base.residual) / actual_step + return jacobian, base + + +def _run_lm_update( + candidate0: np.ndarray, + evaluate: Callable[[np.ndarray], _ResidualEvaluation], + *, + lower_bounds: np.ndarray, + upper_bounds: np.ndarray, + max_iterations: int, + damping: float, + step_epsilon: float, + step_tolerance: float, +) -> tuple[np.ndarray, np.ndarray, _ResidualEvaluation, int]: + candidate = np.asarray(candidate0, dtype=np.float64).copy() + lambda_value = max(damping, 1e-6) + accepted_iterations = 0 + best_jacobian, best_evaluation = _finite_difference_jacobian( + candidate, + evaluate, + step_epsilon=step_epsilon, + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, + ) + best_loss = 0.5 * float(np.dot(best_evaluation.residual, best_evaluation.residual)) + for _ in range(max_iterations): + jtj = best_jacobian.T @ best_jacobian + diagonal = np.maximum(np.diag(jtj), 1e-6) + hessian = jtj + np.diag(lambda_value * diagonal) + gradient = best_jacobian.T @ best_evaluation.residual + try: + step = -np.linalg.solve(hessian, gradient) + except np.linalg.LinAlgError: + step = -np.linalg.pinv(hessian) @ gradient + if float(np.linalg.norm(step)) <= step_tolerance: + break + proposed = np.clip(candidate + step, lower_bounds, upper_bounds) + if np.allclose(proposed, candidate): + break + proposed_evaluation = evaluate(proposed) + proposed_loss = 0.5 * float(np.dot(proposed_evaluation.residual, proposed_evaluation.residual)) + if proposed_loss < best_loss: + candidate = proposed + best_loss = proposed_loss + best_jacobian, best_evaluation = _finite_difference_jacobian( + candidate, + evaluate, + step_epsilon=step_epsilon, + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, + ) + lambda_value = max(lambda_value * 0.5, 1e-6) + accepted_iterations += 1 + continue + lambda_value *= 4.0 + return candidate, best_jacobian, best_evaluation, accepted_iterations def update_state_from_multiview( @@ -307,41 +568,145 @@ def update_state_from_multiview( cameras: tuple[CameraCalibration, ...], matched: dict[str, PoseDetection], noise_scale: np.ndarray, -) -> SkeletonState: + noise_by_view: dict[str, np.ndarray] | None = None, + *, + joint_gate_px: float, + beta_frozen: bool, + max_iterations: int, + damping: float, + step_epsilon: float, + step_tolerance: float, + student_t_dof: float, +) -> MultiviewUpdateResult: if not matched: - return state + return MultiviewUpdateResult( + state=state, + parameter_covariance=np.eye(PARAMETER_DIMENSION, dtype=np.float64), + beta_covariance=np.eye(SHAPE_DIMENSION, dtype=np.float64) * 0.01, + accepted_joint_masks={}, + accepted_joint_counts_by_view={}, + accepted_joint_count=0, + accepted_view_count=0, + mean_reprojection_error=np.inf, + lm_iterations=0, + ) camera_by_name = {camera.name: camera for camera in cameras} - result = least_squares( - _2d_update_residual, - state.parameters, - args=(state.parameters, state.beta, cameras, matched, camera_by_name, noise_scale), - method="trf", - max_nfev=30, - loss="soft_l1", - f_scale=1.0, + resolved_noise_by_view = _resolve_noise_by_view(matched, noise_scale, noise_by_view) + accepted_joint_masks = _joint_acceptance_masks( + state.pose3d, + cameras, + matched, + resolved_noise_by_view, + joint_gate_px=joint_gate_px, + ) + accepted_joint_counts_by_view = { + camera_name: int(np.count_nonzero(mask)) + for camera_name, mask in accepted_joint_masks.items() + } + accepted_joint_count = sum(accepted_joint_counts_by_view.values()) + accepted_view_count = sum(int(count > 0) for count in accepted_joint_counts_by_view.values()) + candidate0 = _pack_candidate(state.parameters, state.beta, beta_frozen=beta_frozen) + parameter_lower, parameter_upper = _parameter_bounds() + if beta_frozen: + lower_bounds = parameter_lower + upper_bounds = parameter_upper + else: + beta_lower = np.clip(state.beta - 0.2, 0.5, 2.0) + beta_upper = np.clip(state.beta + 0.2, 0.5, 2.0) + lower_bounds = np.concatenate([parameter_lower, beta_lower], axis=0) + upper_bounds = np.concatenate([parameter_upper, beta_upper], axis=0) + lower_bounds = np.minimum(lower_bounds, candidate0 - 1e-6) + upper_bounds = np.maximum(upper_bounds, candidate0 + 1e-6) + + def evaluate(candidate: np.ndarray) -> _ResidualEvaluation: + return _2d_update_residual( + candidate, + state.parameters, + matched, + camera_by_name, + resolved_noise_by_view, + accepted_joint_masks, + beta_frozen=beta_frozen, + predicted_beta=state.beta, + student_t_dof=student_t_dof, + ) + + candidate, jacobian, evaluation, lm_iterations = _run_lm_update( + candidate0, + evaluate, + lower_bounds=lower_bounds, + upper_bounds=upper_bounds, + max_iterations=max_iterations, + damping=damping, + step_epsilon=step_epsilon, + step_tolerance=step_tolerance, + ) + parameters, resolved_beta = _unpack_candidate(candidate, state.beta, beta_frozen=beta_frozen) + rendered = render_pose(parameters, resolved_beta) + if jacobian.size == 0: + parameter_covariance = np.eye(PARAMETER_DIMENSION, dtype=np.float64) + beta_covariance = np.eye(SHAPE_DIMENSION, dtype=np.float64) * 0.01 + else: + jtj = jacobian.T @ jacobian + stabilized = jtj + np.eye(jtj.shape[0], dtype=np.float64) * 1e-6 + full_covariance = np.linalg.pinv(stabilized) + parameter_covariance = full_covariance[:PARAMETER_DIMENSION, :PARAMETER_DIMENSION] + if beta_frozen: + beta_covariance = np.eye(SHAPE_DIMENSION, dtype=np.float64) * 0.01 + else: + beta_covariance = full_covariance[PARAMETER_DIMENSION:, PARAMETER_DIMENSION:] + return MultiviewUpdateResult( + state=SkeletonState(parameters=parameters, beta=resolved_beta, pose3d=rendered), + parameter_covariance=np.asarray(parameter_covariance, dtype=np.float64), + beta_covariance=np.asarray(beta_covariance, dtype=np.float64), + accepted_joint_masks=accepted_joint_masks, + accepted_joint_counts_by_view=accepted_joint_counts_by_view, + accepted_joint_count=accepted_joint_count, + accepted_view_count=accepted_view_count, + mean_reprojection_error=evaluation.mean_reprojection_error, + lm_iterations=lm_iterations, ) - parameters = np.asarray(result.x, dtype=np.float64) - return SkeletonState(parameters=parameters, beta=state.beta.copy(), pose3d=render_pose(parameters, state.beta)) def update_noise_scale( previous: np.ndarray, + previous_by_view: dict[str, np.ndarray], cameras: tuple[CameraCalibration, ...], pose3d: Pose3D, matched: dict[str, PoseDetection], + accepted_joint_masks: dict[str, np.ndarray], *, ema: float, -) -> np.ndarray: + min_px: float, + max_px: float, + residual_cap_px: float, +) -> tuple[np.ndarray, dict[str, np.ndarray]]: if not matched: - return previous + return previous, previous_by_view updated = previous.copy() + updated_by_view = {name: np.asarray(value, dtype=np.float64).copy() for name, value in previous_by_view.items()} camera_by_name = {camera.name: camera for camera in cameras} for camera_name, detection in matched.items(): projected = project_pose(camera_by_name[camera_name], pose3d) - mask = detection.keypoints[:, 2] > 0.05 + mask = accepted_joint_masks.get(camera_name) + if mask is None: + mask = detection.keypoints[:, 2] > 0.05 if not np.any(mask): continue + previous_view = updated_by_view.get(camera_name, previous.copy()) residual = projected[mask, :2] - detection.keypoints[mask, :2] magnitude = np.sqrt(np.sum(residual * residual, axis=1)) - updated[mask] = np.clip(ema * updated[mask] + (1.0 - ema) * magnitude, 3.0, 45.0) - return updated + base_sigma = _base_sigma_from_confidence(detection.keypoints[mask, 2], min_px, max_px) + observed_sigma = np.sqrt( + np.square(base_sigma) + np.square(np.clip(magnitude, 0.0, residual_cap_px)) + ) + previous_view[mask] = np.clip( + ema * previous_view[mask] + (1.0 - ema) * observed_sigma, + min_px, + max_px, + ) + updated_by_view[camera_name] = previous_view + if updated_by_view: + stacked = np.stack(list(updated_by_view.values()), axis=0) + updated = np.median(stacked, axis=0) + return updated, updated_by_view diff --git a/src/pose_tracking_exp/tracking/replay_io.py b/src/pose_tracking_exp/tracking/replay_io.py index c57c719..f98d2d1 100644 --- a/src/pose_tracking_exp/tracking/replay_io.py +++ b/src/pose_tracking_exp/tracking/replay_io.py @@ -32,7 +32,12 @@ def _as_float_array(values: object, shape: tuple[int, ...]) -> np.ndarray: @beartype def load_scene_file(path: Path) -> SceneConfig: payload = json.loads(path.read_text(encoding="utf-8")) - default_extrinsic_format = str(payload.get("extrinsic_format", _OPENCV_EXTRINSICS)) + default_extrinsic_format = str( + payload.get( + "extrinsic_format", + _RPT_POSE if "imgpaths" in payload and "extrinsic_format" not in payload else _OPENCV_EXTRINSICS, + ) + ) cameras: list[CameraCalibration] = [] for camera_payload in payload["cameras"]: extrinsic_format = str( diff --git a/src/pose_tracking_exp/tracking/rpt_adapter.py b/src/pose_tracking_exp/tracking/rpt_adapter.py index 1b0633c..52febd2 100644 --- a/src/pose_tracking_exp/tracking/rpt_adapter.py +++ b/src/pose_tracking_exp/tracking/rpt_adapter.py @@ -3,9 +3,11 @@ import rpt from beartype import beartype from rpt._core import TriangulationConfig, TriangulationTrace # type: ignore[reportMissingModuleSource] +from pose_tracking_exp.common.camera_math import project_pose from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME, BODY20_JOINT_NAMES, BODY20_OBSERVATION_COUNT +from pose_tracking_exp.common.normalization import core_reprojection_distance from pose_tracking_exp.common.tensor_types import Pose2D -from pose_tracking_exp.schema import CameraFrame, ProposalCluster, SceneConfig +from pose_tracking_exp.schema import CameraCalibration, CameraFrame, ProposalCluster, SceneConfig def build_rpt_config( @@ -54,28 +56,66 @@ def pack_view_detections(frames: tuple[CameraFrame, ...], unmatched_indices: dic @beartype def extract_clusters( trace: TriangulationTrace, - camera_names: tuple[str, ...], + frames: tuple[CameraFrame, ...], + cameras: tuple[CameraCalibration, ...], + unmatched_indices: dict[str, list[int]], ) -> tuple[ProposalCluster, ...]: clusters: list[ProposalCluster] = [] + camera_by_name = {camera.name: camera for camera in cameras} + frame_by_name = {frame.camera_name: frame for frame in frames} for pose_index, pose3d in enumerate(trace.final_poses): pose_array = np.asarray(pose3d, dtype=np.float64) root = pose_array[BODY20_INDEX_BY_NAME["hip_middle"], :3] + root_centered_pose = pose_array.copy() + root_centered_pose[:, :3] -= root[None, :] source_indices = [] if pose_index < len(trace.merge.group_proposal_indices): source_indices = trace.merge.group_proposal_indices[pose_index] source_views: set[str] = set() + reprojection_errors: list[float] = [] + observation_keys: set[tuple[str, int]] = set() + proposal_scores: list[float] = [] + support_detection_indices: dict[str, set[int]] = {} for core_proposal_index in source_indices: - pair = trace.core_proposals[core_proposal_index].pair - source_views.add(camera_names[pair.view1]) - source_views.add(camera_names[pair.view2]) - mean_score = float(np.mean(pose_array[:, 3][pose_array[:, 3] > 0.0])) if np.any(pose_array[:, 3] > 0.0) else 0.0 + core_proposal = trace.core_proposals[core_proposal_index] + pair = core_proposal.pair + proposal_scores.append(float(core_proposal.score)) + for view_index, person_index in ((pair.view1, pair.person1), (pair.view2, pair.person2)): + camera_name = frames[view_index].camera_name + source_views.add(camera_name) + if (camera_name, int(person_index)) in observation_keys: + continue + observation_keys.add((camera_name, int(person_index))) + detection_indices = unmatched_indices.get(camera_name, []) + if person_index < 0 or person_index >= len(detection_indices): + continue + detection_index = int(detection_indices[person_index]) + support_detection_indices.setdefault(camera_name, set()).add(detection_index) + detection = frame_by_name[camera_name].detections[detection_index] + projected = project_pose(camera_by_name[camera_name], pose_array) + reprojection_errors.append(core_reprojection_distance(projected, detection.keypoints)) + mean_score = ( + float(np.mean(np.asarray(proposal_scores, dtype=np.float64))) + if proposal_scores + else float(np.mean(pose_array[:, 3][pose_array[:, 3] > 0.0])) if np.any(pose_array[:, 3] > 0.0) else 0.0 + ) clusters.append( ProposalCluster( pose3d=pose_array, root=np.asarray(root, dtype=np.float64), + root_centered_pose3d=np.asarray(root_centered_pose, dtype=np.float64), source_views=frozenset(source_views), + view_count=len(source_views), support_size=max(1, len(source_indices)), + pair_count=max(1, len(source_indices)), mean_score=mean_score, + mean_reprojection_error=( + float(np.median(np.asarray(reprojection_errors, dtype=np.float64))) if reprojection_errors else np.inf + ), + support_detection_indices={ + camera_name: tuple(sorted(indices)) + for camera_name, indices in support_detection_indices.items() + }, ) ) return tuple(clusters) diff --git a/src/pose_tracking_exp/tracking/tracker.py b/src/pose_tracking_exp/tracking/tracker.py index 5c036e8..659a79e 100644 --- a/src/pose_tracking_exp/tracking/tracker.py +++ b/src/pose_tracking_exp/tracking/tracker.py @@ -1,5 +1,6 @@ -from dataclasses import replace +from dataclasses import dataclass, replace from time import perf_counter +from typing import Literal import numpy as np from beartype import beartype @@ -7,7 +8,6 @@ from scipy.optimize import linear_sum_assignment from pose_tracking_exp.common.camera_math import project_pose from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME, CORE_JOINT_INDICES -from pose_tracking_exp.common.normalization import core_reprojection_distance from pose_tracking_exp.schema import ( ActiveTrackState, FrameBundle, @@ -16,48 +16,224 @@ from pose_tracking_exp.schema import ( SceneConfig, SkeletonState, TentativeTrackState, + TRACK_COVARIANCE_DIMENSION, + TrackUpdateEvent, TrackedFrameResult, TrackerConfig, TrackerDiagnostics, ) -from pose_tracking_exp.tracking.kinematics import seed_state_from_pose3d, update_noise_scale, update_state_from_multiview +from pose_tracking_exp.tracking.kinematics import ( + MultiviewUpdateResult, + PARAMETER_DIMENSION, + STABLE_LIMB_NAMES, + estimate_beta_from_pose_buffer, + render_pose, + seed_state_from_pose3d, + update_noise_scale, + update_state_from_multiview, +) from pose_tracking_exp.tracking.rpt_adapter import build_rpt_config, extract_clusters, pack_view_detections CORE_JOINT_MASK = np.zeros((20,), dtype=bool) CORE_JOINT_MASK[list(CORE_JOINT_INDICES)] = True +VELOCITY_SLICE = slice(PARAMETER_DIMENSION, PARAMETER_DIMENSION * 2) +BETA_SLICE = slice(PARAMETER_DIMENSION * 2, TRACK_COVARIANCE_DIMENSION) + + +@dataclass(slots=True) +class _PredictedTrackState: + skeleton: SkeletonState + covariance: np.ndarray def _copy_pose3d(pose3d: np.ndarray) -> np.ndarray: return np.asarray(pose3d, dtype=np.float64).copy() -def _proposal_rank(proposal: ProposalCluster) -> tuple[int, float]: - return proposal.support_size, proposal.mean_score +def _copy_root_centered_pose3d(pose3d: np.ndarray) -> np.ndarray: + copied = _copy_pose3d(pose3d) + root = copied[BODY20_INDEX_BY_NAME["hip_middle"], :3] + copied[:, :3] -= root[None, :] + return copied -def _active_track_rank(track: ActiveTrackState) -> tuple[float, int, int]: - return track.score, -track.misses, track.last_bundle_index +def _mean_finite(values: list[float], *, default: float) -> float: + finite = [value for value in values if np.isfinite(value)] + if not finite: + return default + return float(np.mean(np.asarray(finite, dtype=np.float64))) -def _lost_track_rank(track: ActiveTrackState) -> tuple[float, int, int]: - return track.score, -track.lost_age, track.last_bundle_index +def _coefficient_of_variation(samples: list[float]) -> float: + if len(samples) < 2: + return 0.0 + values = np.asarray(samples, dtype=np.float64) + mean = float(np.mean(values)) + if mean <= 1e-8: + return 0.0 + return float(np.std(values) / mean) -def _tentative_track_rank(track: TentativeTrackState) -> tuple[float, int, int, int]: - return track.score, len(track.evidence_buffer), -track.misses, track.last_bundle_index +def _limb_length(pose3d: np.ndarray, name_a: str, name_b: str) -> float | None: + index_a = BODY20_INDEX_BY_NAME[name_a] + index_b = BODY20_INDEX_BY_NAME[name_b] + if pose3d[index_a, 3] <= 0.0 or pose3d[index_b, 3] <= 0.0: + return None + return float(np.linalg.norm(pose3d[index_a, :3] - pose3d[index_b, :3])) + + +def _track_root(track: ActiveTrackState) -> np.ndarray: + return np.asarray(track.skeleton.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], :3], dtype=np.float64) + + +def _covariance_trace(covariance: np.ndarray) -> float: + return float(np.trace(np.asarray(covariance, dtype=np.float64))) + + +def _proposal_rank(proposal: ProposalCluster) -> tuple[int, int, float, float]: + return proposal.view_count, proposal.support_size, proposal.mean_score, -proposal.mean_reprojection_error + + +def _active_track_rank(track: ActiveTrackState) -> tuple[int, float, int, int]: + return 2, track.score, -track.misses, track.last_bundle_index + + +def _lost_track_rank(track: ActiveTrackState) -> tuple[int, float, int, int]: + return 1, track.score, -track.lost_age, track.last_bundle_index + + +def _tentative_track_rank(track: TentativeTrackState) -> tuple[int, float, int, int]: + return 0, track.score, track.hit_count, track.last_bundle_index def _tentative_root_spread(track: TentativeTrackState) -> float: if len(track.evidence_buffer) < 2: return 0.0 - roots = np.asarray( - [pose[BODY20_INDEX_BY_NAME["hip_middle"], :3] for pose in track.evidence_buffer], - dtype=np.float64, - ) + roots = np.asarray([proposal.root for proposal in track.evidence_buffer], dtype=np.float64) center = roots.mean(axis=0) return float(np.max(np.linalg.norm(roots - center[None, :], axis=1))) +def _tentative_bone_cv(track: TentativeTrackState) -> float: + if len(track.evidence_buffer) < 2: + return 0.0 + pose_buffer = [proposal.pose3d for proposal in track.evidence_buffer] + cvs: list[float] = [] + for name_a, name_b in STABLE_LIMB_NAMES: + samples = [ + limb_length + for pose3d in pose_buffer + if (limb_length := _limb_length(pose3d, name_a, name_b)) is not None + ] + if len(samples) >= 2: + cvs.append(_coefficient_of_variation(samples)) + if not cvs: + return 0.0 + return float(np.mean(np.asarray(cvs, dtype=np.float64))) + + +def _core_pose_distance(left: np.ndarray, right: np.ndarray) -> float: + mask = CORE_JOINT_MASK.copy() + mask &= left[:, 3] > 0.0 + mask &= right[:, 3] > 0.0 + if not np.any(mask): + return np.inf + deltas = left[mask, :3] - right[mask, :3] + return float(np.sqrt(np.mean(np.sum(deltas * deltas, axis=1)))) + + +def _core_match_metrics( + projected: np.ndarray, + detection: PoseDetection, + sigma: np.ndarray, + *, + gate_px: float, +) -> tuple[float, int]: + observed = detection.keypoints + mask = CORE_JOINT_MASK.copy() + mask &= projected[:, 2] > 0.0 + mask &= observed[:, 2] > 0.05 + if not np.any(mask): + return np.inf, 0 + delta = projected[mask, :2] - observed[mask, :2] + distance = np.sqrt(np.sum(delta * delta, axis=1)) + normalized = distance / np.maximum(sigma[mask], 1.0) + accepted = (distance <= gate_px) | (normalized <= 2.5) + accepted_count = int(np.count_nonzero(accepted)) + if accepted_count <= 0: + return np.inf, 0 + accepted_delta = delta[accepted] + accepted_weights = np.clip(observed[mask, 2][accepted], 0.05, 1.0) + accepted_sigma = np.maximum(np.square(sigma[mask][accepted]), 1.0) + squared = np.sum(accepted_delta * accepted_delta, axis=1) + normalized = squared / accepted_sigma + return float(np.sqrt(np.average(normalized, weights=accepted_weights))), accepted_count + + +def _track_score_after_update( + score: float, + *, + decay: float, + view_gain: float, + joint_gain: float, + reprojection_penalty: float, + miss_penalty: float, + reacquire_gain: float = 0.0, + accepted_views: int, + accepted_joints: int, + mean_reprojection_error: float, + miss: bool, +) -> float: + updated = decay * score + updated += view_gain * float(accepted_views) + updated += joint_gain * (float(accepted_joints) / 20.0) + if np.isfinite(mean_reprojection_error): + updated -= reprojection_penalty * float(mean_reprojection_error) + if miss: + updated -= miss_penalty + else: + updated += reacquire_gain + return float(updated) + + +def _track_process_noise(config: TrackerConfig, *, beta_frozen: bool, scale: float) -> np.ndarray: + diagonal = np.full((TRACK_COVARIANCE_DIMENSION,), config.process_noise_joint_rad**2, dtype=np.float64) + diagonal[:3] = config.process_noise_root_position_m**2 + diagonal[3:6] = config.process_noise_root_rotation_rad**2 + diagonal[6:PARAMETER_DIMENSION] = config.process_noise_joint_rad**2 + diagonal[VELOCITY_SLICE] = config.process_noise_velocity**2 + diagonal[BETA_SLICE] = 0.0 if beta_frozen else config.process_noise_beta**2 + return diagonal * scale + + +def _embed_measurement_covariance( + predicted_covariance: np.ndarray, + parameter_covariance: np.ndarray, + beta_covariance: np.ndarray, + *, + beta_frozen: bool, +) -> np.ndarray: + updated = np.asarray(predicted_covariance, dtype=np.float64).copy() + updated[:PARAMETER_DIMENSION, :PARAMETER_DIMENSION] = np.asarray(parameter_covariance, dtype=np.float64) + updated[:PARAMETER_DIMENSION, VELOCITY_SLICE] *= 0.5 + updated[VELOCITY_SLICE, :PARAMETER_DIMENSION] = updated[:PARAMETER_DIMENSION, VELOCITY_SLICE].T + updated[VELOCITY_SLICE, VELOCITY_SLICE] *= 0.75 + if not beta_frozen: + updated[BETA_SLICE, BETA_SLICE] = np.asarray(beta_covariance, dtype=np.float64) + updated[:PARAMETER_DIMENSION, BETA_SLICE] = 0.0 + updated[BETA_SLICE, :PARAMETER_DIMENSION] = 0.0 + updated[VELOCITY_SLICE, BETA_SLICE] = 0.0 + updated[BETA_SLICE, VELOCITY_SLICE] = 0.0 + symmetrized = 0.5 * (updated + updated.T) + return symmetrized + np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64) * 1e-6 + + +def _tentative_reprojection_penalty(proposal: ProposalCluster) -> float: + if not np.isfinite(proposal.mean_reprojection_error): + return 0.0 + return 0.001 * proposal.mean_reprojection_error + + @beartype class PoseTracker: def __init__(self, scene: SceneConfig, config: TrackerConfig | None = None) -> None: @@ -87,14 +263,16 @@ class PoseTracker: def step(self, bundle: FrameBundle) -> TrackedFrameResult: self._enforce_track_limits() - matches, unmatched = self._match_existing_tracks(bundle) - self._update_active_tracks(bundle, matches) - self._update_lost_tracks(bundle, matches) - proposals = self._refresh_capped_single_track_from_proposals(bundle, self._build_proposals(bundle, unmatched)) - self._update_tentative_tracks(bundle, self._birth_candidate_proposals(proposals)) - self._promote_tentative_tracks(bundle) - self._reacquire_lost_tracks(bundle, proposals) - self._delete_expired_tracks() + update_events: list[TrackUpdateEvent] = [] + predicted_states = self._predicted_skeletons() + matches, unmatched = self._match_existing_tracks(bundle, predicted_states) + self._update_active_tracks(bundle, predicted_states, matches, update_events) + self._update_lost_tracks(bundle, predicted_states, matches, update_events) + proposals = self._build_proposals(bundle, unmatched) + self._update_tentative_tracks(bundle, proposals, update_events) + self._promote_tentative_tracks(bundle, update_events) + self._reacquire_lost_tracks(bundle, predicted_states, proposals, update_events) + self._delete_expired_tracks(update_events) self._enforce_track_limits() return TrackedFrameResult( bundle_index=bundle.bundle_index, @@ -102,84 +280,93 @@ class PoseTracker: tentative_tracks=tuple(sorted(self._tentative.values(), key=lambda item: item.track_id)), active_tracks=tuple(sorted(self._active.values(), key=lambda item: item.track_id)), lost_tracks=tuple(sorted(self._lost.values(), key=lambda item: item.track_id)), - proposals=proposals, + proposals=tuple(sorted(proposals, key=_proposal_rank, reverse=True)), + update_events=tuple(update_events), ) def _track_limit(self) -> int | None: return self._config.max_active_tracks - def _single_track_cap_enabled(self) -> bool: - return self._config.max_active_tracks == 1 - - def _keep_best_active_tracks(self, limit: int) -> None: - if len(self._active) <= limit: - return - ranked_ids = sorted(self._active, key=lambda track_id: _active_track_rank(self._active[track_id]), reverse=True) - keep_ids = set(ranked_ids[:limit]) - for track_id in list(self._active): - if track_id not in keep_ids: - self._active.pop(track_id, None) - - def _keep_best_lost_tracks(self, limit: int) -> None: - if len(self._lost) <= limit: - return - ranked_ids = sorted(self._lost, key=lambda track_id: _lost_track_rank(self._lost[track_id]), reverse=True) - keep_ids = set(ranked_ids[:limit]) - for track_id in list(self._lost): - if track_id not in keep_ids: - self._lost.pop(track_id, None) - - def _keep_best_tentative_tracks(self, limit: int) -> None: - if len(self._tentative) <= limit: - return - ranked_ids = sorted( - self._tentative, - key=lambda track_id: _tentative_track_rank(self._tentative[track_id]), - reverse=True, - ) - keep_ids = set(ranked_ids[:limit]) - for track_id in list(self._tentative): - if track_id not in keep_ids: - self._tentative.pop(track_id, None) + def _capacity_for_new_track(self) -> int | None: + limit = self._track_limit() + if limit is None: + return None + return max(0, limit - (len(self._active) + len(self._lost) + len(self._tentative))) def _enforce_track_limits(self) -> None: limit = self._track_limit() if limit is None: return - self._keep_best_active_tracks(limit) - self._keep_best_lost_tracks(limit) - self._keep_best_tentative_tracks(limit) - if not self._single_track_cap_enabled(): - return - if self._active: - self._lost.clear() - self._tentative.clear() - return - if self._lost: - self._tentative.clear() + ranked_entries: list[tuple[tuple[int, float, int, int], str, int]] = [] + ranked_entries.extend((_active_track_rank(track), "active", track_id) for track_id, track in self._active.items()) + ranked_entries.extend((_lost_track_rank(track), "lost", track_id) for track_id, track in self._lost.items()) + ranked_entries.extend( + (_tentative_track_rank(track), "tentative", track_id) for track_id, track in self._tentative.items() + ) + ranked_entries.sort(key=lambda item: item[0], reverse=True) + keep_entries = {(bucket, track_id) for _, bucket, track_id in ranked_entries[:limit]} + for track_id in list(self._active): + if ("active", track_id) not in keep_entries: + self._active.pop(track_id, None) + for track_id in list(self._lost): + if ("lost", track_id) not in keep_entries: + self._lost.pop(track_id, None) + for track_id in list(self._tentative): + if ("tentative", track_id) not in keep_entries: + self._tentative.pop(track_id, None) - def _predicted_pose_by_track(self) -> dict[int, np.ndarray]: - result: dict[int, np.ndarray] = {} + def _predict_track_state(self, track: ActiveTrackState) -> _PredictedTrackState: + predicted_parameters = track.skeleton.parameters + track.joint_velocity + predicted_parameters = np.asarray(predicted_parameters, dtype=np.float64) + predicted_parameters[0:3] = track.skeleton.parameters[0:3] + track.root_velocity + predicted_pose = render_pose(predicted_parameters, track.skeleton.beta) + transition = np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64) + transition[:PARAMETER_DIMENSION, VELOCITY_SLICE] = np.eye(PARAMETER_DIMENSION, dtype=np.float64) + process_noise = np.diag( + _track_process_noise( + self._config, + beta_frozen=track.beta_frozen, + scale=1.0, + ) + ) + predicted_covariance = transition @ track.covariance @ transition.T + process_noise + return _PredictedTrackState( + skeleton=SkeletonState( + parameters=predicted_parameters, + beta=track.skeleton.beta.copy(), + pose3d=predicted_pose, + ), + covariance=0.5 * (predicted_covariance + predicted_covariance.T), + ) + + def _is_in_working_volume(self, root: np.ndarray) -> bool: + half_extent = self._scene.room_size * 0.5 + lower = self._scene.room_center - half_extent + upper = self._scene.room_center + half_extent + return bool(np.all(root >= lower) and np.all(root <= upper)) + + def _predicted_skeletons(self) -> dict[int, _PredictedTrackState]: + result: dict[int, _PredictedTrackState] = {} for track_id, track in self._active.items(): - result[track_id] = track.skeleton.pose3d + result[track_id] = self._predict_track_state(track) for track_id, track in self._lost.items(): - result[track_id] = track.skeleton.pose3d + result[track_id] = self._predict_track_state(track) return result def _match_existing_tracks( self, bundle: FrameBundle, + predicted_states: dict[int, _PredictedTrackState], ) -> tuple[dict[int, dict[str, PoseDetection]], dict[str, list[int]]]: started_at = perf_counter() try: - predicted = self._predicted_pose_by_track() - matches: dict[int, dict[str, PoseDetection]] = {track_id: {} for track_id in predicted} + matches: dict[int, dict[str, PoseDetection]] = {track_id: {} for track_id in predicted_states} unmatched_indices = {frame.camera_name: list(range(len(frame.detections))) for frame in bundle.views} - if not predicted: + if not predicted_states: return matches, unmatched_indices camera_by_name = {camera.name: camera for camera in self._scene.cameras} - track_ids = sorted(predicted) + track_ids = sorted(predicted_states) for frame in bundle.views: cost = np.full( (len(track_ids), len(frame.detections)), @@ -187,13 +374,25 @@ class PoseTracker: dtype=np.float64, ) for row, track_id in enumerate(track_ids): - projected = project_pose(camera_by_name[frame.camera_name], predicted[track_id]) - projected_core = projected.copy() - projected_core[~CORE_JOINT_MASK, 2] = 0.0 + track = self._active.get(track_id, self._lost.get(track_id)) + if track is None: + continue + projected = project_pose(camera_by_name[frame.camera_name], predicted_states[track_id].skeleton.pose3d) + sigma = track.noise_by_view.get(frame.camera_name, track.noise_scale) + min_core_joints = ( + self._config.lost_min_accepted_core_joints + if track.status == "lost" + else self._config.association_min_core_joints + ) for col, detection in enumerate(frame.detections): - observed_core = detection.keypoints.copy() - observed_core[~CORE_JOINT_MASK, 2] = 0.0 - cost[row, col] = core_reprojection_distance(projected_core, observed_core) + match_cost, accepted_core_joints = _core_match_metrics( + projected, + detection, + sigma, + gate_px=self._config.active_core_gate_px, + ) + if accepted_core_joints >= min_core_joints: + cost[row, col] = match_cost if cost.size == 0: continue rows, cols = linear_sum_assignment(cost) @@ -214,44 +413,277 @@ class PoseTracker: def _refine_track_state( self, track: ActiveTrackState, + predicted_state: SkeletonState, matched: dict[str, PoseDetection], - ) -> tuple[SkeletonState, np.ndarray]: + ): self._diagnostics.nonlinear_refinements += 1 - updated_skeleton = update_state_from_multiview( - track.skeleton, + update_result = update_state_from_multiview( + predicted_state, self._scene.cameras, matched, track.noise_scale, + track.noise_by_view, + joint_gate_px=self._config.active_joint_gate_px, + beta_frozen=track.beta_frozen, + max_iterations=self._config.lm_max_iterations, + damping=self._config.lm_damping, + step_epsilon=self._config.lm_step_epsilon, + step_tolerance=self._config.lm_step_tolerance, + student_t_dof=self._config.lm_student_t_dof, ) - updated_noise = update_noise_scale( + updated_noise, updated_noise_by_view = update_noise_scale( track.noise_scale, + track.noise_by_view, self._scene.cameras, - updated_skeleton.pose3d, + update_result.state.pose3d, matched, + update_result.accepted_joint_masks, ema=self._config.noise_ema, + min_px=self._config.noise_min_px, + max_px=self._config.noise_max_px, + residual_cap_px=self._config.noise_residual_cap_px, ) self._diagnostics.active_updates += 1 - return updated_skeleton, updated_noise + self._diagnostics.lm_iterations += update_result.lm_iterations + return update_result, updated_noise, updated_noise_by_view def _seed_skeleton(self, pose3d: np.ndarray, beta: np.ndarray | None = None) -> SkeletonState: self._diagnostics.seed_initializations += 1 return seed_state_from_pose3d(_copy_pose3d(pose3d), beta=None if beta is None else beta.copy()) - def _update_active_tracks(self, bundle: FrameBundle, matches: dict[int, dict[str, PoseDetection]]) -> None: + def _predict_only_update( + self, + track: ActiveTrackState, + predicted: _PredictedTrackState, + *, + bundle_index: int, + lost_track: bool, + ) -> None: + track.skeleton = predicted.skeleton + track.covariance = predicted.covariance + np.diag( + _track_process_noise( + self._config, + beta_frozen=track.beta_frozen, + scale=self._config.predict_only_process_scale, + ) + ) + if lost_track: + track.score = _track_score_after_update( + track.score, + decay=self._config.lost_score_decay, + view_gain=0.0, + joint_gain=0.0, + reprojection_penalty=0.0, + miss_penalty=self._config.lost_score_miss_penalty, + accepted_views=0, + accepted_joints=0, + mean_reprojection_error=np.inf, + miss=True, + ) + else: + track.score = _track_score_after_update( + track.score, + decay=self._config.active_score_decay, + view_gain=0.0, + joint_gain=0.0, + reprojection_penalty=0.0, + miss_penalty=self._config.active_score_miss_penalty, + accepted_views=0, + accepted_joints=0, + mean_reprojection_error=np.inf, + miss=True, + ) + track.last_bundle_index = bundle_index + track.last_update_kind = "predict_only" + track.mean_reprojection_error = np.inf + track.outside_volume_age = 0 if self._is_in_working_volume(_track_root(track)) else track.outside_volume_age + 1 + if lost_track: + track.lost_age += 1 + else: + track.misses += 1 + self._diagnostics.predict_only_updates += 1 + + def _apply_successful_update( + self, + track: ActiveTrackState, + predicted: _PredictedTrackState, + update_result, + updated_noise: np.ndarray, + updated_noise_by_view: dict[str, np.ndarray], + *, + bundle_index: int, + update_kind: Literal["direct_update", "direct_reacquire", "proposal_reacquire"], + ) -> None: + previous_parameters = track.skeleton.parameters.copy() + updated_skeleton = update_result.state + track.root_velocity = updated_skeleton.parameters[0:3] - previous_parameters[0:3] + track.joint_velocity = updated_skeleton.parameters - previous_parameters + track.skeleton = updated_skeleton + track.noise_scale = updated_noise + track.noise_by_view = updated_noise_by_view + track.covariance = _embed_measurement_covariance( + predicted.covariance, + update_result.parameter_covariance, + update_result.beta_covariance, + beta_frozen=track.beta_frozen, + ) + reacquire_gain = self._config.lost_score_reacquire_gain if track.status == "lost" else 0.0 + track.score = _track_score_after_update( + track.score, + decay=self._config.active_score_decay, + view_gain=self._config.active_score_view_gain, + joint_gain=self._config.active_score_joint_gain, + reprojection_penalty=self._config.active_score_reprojection_penalty, + miss_penalty=self._config.active_score_miss_penalty, + reacquire_gain=reacquire_gain, + accepted_views=update_result.accepted_view_count, + accepted_joints=update_result.accepted_joint_count, + mean_reprojection_error=update_result.mean_reprojection_error, + miss=False, + ) + track.misses = 0 + track.lost_age = 0 + track.last_bundle_index = bundle_index + track.last_update_kind = update_kind + track.mean_reprojection_error = update_result.mean_reprojection_error + track.outside_volume_age = 0 if self._is_in_working_volume(_track_root(track)) else track.outside_volume_age + 1 + if not track.beta_frozen: + track.beta_grace_age += 1 + if track.beta_grace_age >= self._config.beta_grace_frames: + track.beta_frozen = True + track.status = "active" + + def _proposal_support_matches( + self, + bundle: FrameBundle, + track: ActiveTrackState, + proposal: ProposalCluster, + seeded_state: SkeletonState, + ) -> dict[str, PoseDetection]: + matched: dict[str, PoseDetection] = {} + camera_by_name = {camera.name: camera for camera in self._scene.cameras} + frame_by_name = {frame.camera_name: frame for frame in bundle.views} + for camera_name in proposal.source_views: + frame = frame_by_name.get(camera_name) + camera = camera_by_name.get(camera_name) + if frame is None or camera is None: + continue + sigma = track.noise_by_view.get(camera_name, track.noise_scale) + projected = project_pose(camera, seeded_state.pose3d) + best_cost = np.inf + best_detection: PoseDetection | None = None + support_indices = tuple( + index + for index in proposal.support_detection_indices.get(camera_name, ()) + if 0 <= index < len(frame.detections) + ) + candidate_indices = support_indices + tuple( + index for index in range(len(frame.detections)) if index not in support_indices + ) + for detection_index in candidate_indices: + if detection_index < 0 or detection_index >= len(frame.detections): + continue + detection = frame.detections[detection_index] + match_cost, accepted_core_joints = _core_match_metrics( + projected, + detection, + sigma, + gate_px=self._config.active_core_gate_px, + ) + if accepted_core_joints < self._config.lost_min_accepted_core_joints: + continue + if match_cost < best_cost: + best_cost = match_cost + best_detection = detection + if best_detection is not None: + matched[camera_name] = best_detection + return matched + + def _mark_proposal_compatible_lost_track( + self, + track: ActiveTrackState, + proposal: ProposalCluster, + update_events: list[TrackUpdateEvent], + *, + accepted_view_count: int = 0, + accepted_joint_count: int = 0, + mean_reprojection_error: float | None = None, + ) -> None: + track.score += self._config.proposal_compatible_score_relief + if mean_reprojection_error is not None and np.isfinite(mean_reprojection_error): + track.mean_reprojection_error = mean_reprojection_error + self._diagnostics.proposal_compatible_lost_frames += 1 + update_events.append( + TrackUpdateEvent( + track_id=track.track_id, + action="proposal_compatible", + proposal_view_count=proposal.view_count, + proposal_support_size=proposal.support_size, + accepted_view_count=accepted_view_count, + accepted_joint_count=accepted_joint_count, + mean_reprojection_error=( + proposal.mean_reprojection_error + if mean_reprojection_error is None + else mean_reprojection_error + ), + ) + ) + + def _update_active_tracks( + self, + bundle: FrameBundle, + predicted_states: dict[int, _PredictedTrackState], + matches: dict[int, dict[str, PoseDetection]], + update_events: list[TrackUpdateEvent], + ) -> None: demote_to_lost: list[int] = [] for track_id, track in list(self._active.items()): matched = matches.get(track_id, {}) + predicted = predicted_states[track_id] if len(matched) >= self._config.active_min_views: - updated_skeleton, updated_noise = self._refine_track_state(track, matched) - track.skeleton = updated_skeleton - track.noise_scale = updated_noise - track.score = 0.85 * track.score + len(matched) - track.misses = 0 - track.last_bundle_index = bundle.bundle_index + update_result, updated_noise, updated_noise_by_view = self._refine_track_state( + track, + predicted.skeleton, + matched, + ) + if ( + update_result.accepted_view_count >= self._config.active_min_views + and update_result.accepted_joint_count >= self._config.active_min_accepted_joints + ): + self._apply_successful_update( + track, + predicted, + update_result, + updated_noise, + updated_noise_by_view, + bundle_index=bundle.bundle_index, + update_kind="direct_update", + ) + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="direct_update", + accepted_view_count=update_result.accepted_view_count, + accepted_joint_count=update_result.accepted_joint_count, + mean_reprojection_error=update_result.mean_reprojection_error, + ) + ) + continue + self._predict_only_update(track, predicted, bundle_index=bundle.bundle_index, lost_track=False) + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="predict_only", + mean_reprojection_error=update_result.mean_reprojection_error, + ) + ) else: - track.misses += 1 - track.score -= 1.0 - if track.misses >= self._config.active_miss_to_lost: + self._predict_only_update(track, predicted, bundle_index=bundle.bundle_index, lost_track=False) + update_events.append(TrackUpdateEvent(track_id=track_id, action="predict_only")) + if ( + track.misses >= self._config.active_miss_to_lost + or track.score <= self._config.active_score_lost_threshold + ): demote_to_lost.append(track_id) for track_id in demote_to_lost: @@ -260,23 +692,59 @@ class PoseTracker: track.lost_age = 0 self._lost[track_id] = track - def _update_lost_tracks(self, bundle: FrameBundle, matches: dict[int, dict[str, PoseDetection]]) -> None: + def _update_lost_tracks( + self, + bundle: FrameBundle, + predicted_states: dict[int, _PredictedTrackState], + matches: dict[int, dict[str, PoseDetection]], + update_events: list[TrackUpdateEvent], + ) -> None: recover_to_active: list[int] = [] for track_id, track in list(self._lost.items()): matched = matches.get(track_id, {}) - if len(matched) >= self._config.active_min_views: - updated_skeleton, updated_noise = self._refine_track_state(track, matched) - track.skeleton = updated_skeleton - track.noise_scale = updated_noise - track.status = "active" - track.misses = 0 - track.lost_age = 0 - track.score = max(track.score, 1.0) + len(matched) - track.last_bundle_index = bundle.bundle_index - recover_to_active.append(track_id) + predicted = predicted_states[track_id] + if len(matched) >= self._config.lost_min_views: + update_result, updated_noise, updated_noise_by_view = self._refine_track_state( + track, + predicted.skeleton, + matched, + ) + if ( + update_result.accepted_view_count >= self._config.lost_min_views + and update_result.accepted_joint_count >= self._config.lost_min_accepted_joints + ): + self._apply_successful_update( + track, + predicted, + update_result, + updated_noise, + updated_noise_by_view, + bundle_index=bundle.bundle_index, + update_kind="direct_reacquire", + ) + recover_to_active.append(track_id) + self._diagnostics.direct_reacquisition_updates += 1 + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="direct_reacquire", + accepted_view_count=update_result.accepted_view_count, + accepted_joint_count=update_result.accepted_joint_count, + mean_reprojection_error=update_result.mean_reprojection_error, + ) + ) + continue + self._predict_only_update(track, predicted, bundle_index=bundle.bundle_index, lost_track=True) + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="predict_only", + mean_reprojection_error=update_result.mean_reprojection_error, + ) + ) else: - track.lost_age += 1 - track.score -= 0.5 + self._predict_only_update(track, predicted, bundle_index=bundle.bundle_index, lost_track=True) + update_events.append(TrackUpdateEvent(track_id=track_id, action="predict_only")) for track_id in recover_to_active: self._diagnostics.reacquisitions += 1 self._active[track_id] = self._lost.pop(track_id) @@ -288,128 +756,157 @@ class PoseTracker: if int(np.sum(person_counts)) == 0: return () trace = __import__("rpt").triangulate_debug(poses_2d, person_counts, self._rpt_config) - return extract_clusters(trace, tuple(frame.camera_name for frame in bundle.views)) + return extract_clusters(trace, bundle.views, self._scene.cameras, unmatched_indices) finally: self._diagnostics.proposal_build_calls += 1 self._diagnostics.proposal_build_seconds += perf_counter() - started_at - def _birth_candidate_proposals(self, proposals: tuple[ProposalCluster, ...]) -> tuple[ProposalCluster, ...]: - if not self._single_track_cap_enabled(): - return proposals - if self._active or self._lost: - return () - if not proposals: - return () - return (max(proposals, key=_proposal_rank),) + def _tentative_assignment_cost(self, track: TentativeTrackState, proposal: ProposalCluster) -> float: + root_distance = float(np.linalg.norm(track.root - proposal.root)) + if root_distance > self._config.proposal_match_distance_m: + return np.inf + core_distance = _core_pose_distance(track.root_centered_pose3d, proposal.root_centered_pose3d) + if core_distance > self._config.proposal_core_match_distance_m: + return np.inf + return ( + root_distance / max(self._config.proposal_match_distance_m, 1e-6) + + core_distance / max(self._config.proposal_core_match_distance_m, 1e-6) + ) - def _refresh_capped_single_track_from_proposals( + def _update_tentative_summary(self, track: TentativeTrackState) -> None: + track.mean_view_count = float( + np.mean(np.asarray([proposal.view_count for proposal in track.evidence_buffer], dtype=np.float64)) + ) + track.mean_support_size = float( + np.mean(np.asarray([proposal.support_size for proposal in track.evidence_buffer], dtype=np.float64)) + ) + track.mean_reprojection_error = _mean_finite( + [proposal.mean_reprojection_error for proposal in track.evidence_buffer], + default=np.inf, + ) + if track.evidence_buffer: + track.last_source_views = track.evidence_buffer[-1].source_views + + def _update_tentative_tracks( self, bundle: FrameBundle, proposals: tuple[ProposalCluster, ...], - ) -> tuple[ProposalCluster, ...]: - if not self._single_track_cap_enabled() or not proposals: - return proposals - - remaining = list(proposals) - if self._active: - track = next(iter(self._active.values())) - predicted_root = track.skeleton.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], :3] - best_index = -1 - best_distance = self._config.proposal_match_distance_m - for proposal_index, proposal in enumerate(remaining): - distance = float(np.linalg.norm(predicted_root - proposal.root)) - if distance <= best_distance: - best_distance = distance - best_index = proposal_index - if best_index >= 0: - proposal = remaining.pop(best_index) - track.skeleton = self._seed_skeleton(proposal.pose3d, beta=track.skeleton.beta) - track.score = 0.85 * track.score + proposal.mean_score * proposal.support_size - track.misses = 0 - track.last_bundle_index = bundle.bundle_index - self._diagnostics.active_updates += 1 - return tuple(remaining) - - if self._lost: - track_id, track = next(iter(self._lost.items())) - predicted_root = track.skeleton.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], :3] - best_index = -1 - best_distance = self._config.proposal_match_distance_m - for proposal_index, proposal in enumerate(remaining): - distance = float(np.linalg.norm(predicted_root - proposal.root)) - if distance <= best_distance: - best_distance = distance - best_index = proposal_index - if best_index >= 0: - proposal = remaining.pop(best_index) - track = self._lost.pop(track_id) - track.skeleton = self._seed_skeleton(proposal.pose3d, beta=track.skeleton.beta) - track.status = "active" - track.misses = 0 - track.lost_age = 0 - track.score = max(track.score, 1.0) + proposal.mean_score * proposal.support_size - track.last_bundle_index = bundle.bundle_index - self._active[track_id] = track - self._diagnostics.active_updates += 1 - self._diagnostics.reacquisitions += 1 - return tuple(remaining) - - return proposals - - def _update_tentative_tracks(self, bundle: FrameBundle, proposals: tuple[ProposalCluster, ...]) -> None: + update_events: list[TrackUpdateEvent], + ) -> None: + updated_ids: set[int] = set() for track in self._tentative.values(): track.age += 1 track.misses += 1 - for proposal in proposals: + for proposal in sorted(proposals, key=_proposal_rank, reverse=True): best_track_id = -1 - best_distance = self._config.proposal_match_distance_m + best_cost = np.inf for track_id, track in self._tentative.items(): - distance = float(np.linalg.norm(track.root - proposal.root)) - if distance < best_distance: - best_distance = distance + cost = self._tentative_assignment_cost(track, proposal) + if cost < best_cost: + best_cost = cost best_track_id = track_id if best_track_id < 0: + capacity = self._capacity_for_new_track() + if capacity is not None and capacity <= 0: + continue track_id = self._next_track_id self._next_track_id += 1 - self._tentative[track_id] = TentativeTrackState( + score = proposal.mean_score * max(1, proposal.view_count) - ( + _tentative_reprojection_penalty(proposal) + ) + tentative = TentativeTrackState( track_id=track_id, age=1, misses=0, - score=proposal.mean_score * proposal.support_size, + hit_count=1, + score=score, last_bundle_index=bundle.bundle_index, root=proposal.root.copy(), pose3d=_copy_pose3d(proposal.pose3d), - evidence_buffer=[_copy_pose3d(proposal.pose3d)], + root_centered_pose3d=_copy_pose3d(proposal.root_centered_pose3d), + evidence_buffer=[proposal], + mean_view_count=float(proposal.view_count), + mean_reprojection_error=proposal.mean_reprojection_error, + mean_support_size=float(proposal.support_size), + last_source_views=proposal.source_views, + ) + self._tentative[track_id] = tentative + updated_ids.add(track_id) + self._diagnostics.tentative_updates += 1 + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="tentative_observed", + proposal_view_count=proposal.view_count, + proposal_support_size=proposal.support_size, + ) ) continue track = self._tentative[best_track_id] track.root = proposal.root.copy() track.pose3d = _copy_pose3d(proposal.pose3d) - track.score = 0.8 * track.score + proposal.mean_score * proposal.support_size + track.root_centered_pose3d = _copy_pose3d(proposal.root_centered_pose3d) + track.hit_count += 1 + track.score = ( + 0.75 * track.score + + proposal.mean_score * max(1, proposal.view_count) + - _tentative_reprojection_penalty(proposal) + ) track.misses = 0 track.last_bundle_index = bundle.bundle_index - track.evidence_buffer.append(_copy_pose3d(proposal.pose3d)) + track.evidence_buffer.append(proposal) if len(track.evidence_buffer) > self._config.tentative_buffer_size: track.evidence_buffer.pop(0) + self._update_tentative_summary(track) + updated_ids.add(best_track_id) + self._diagnostics.tentative_updates += 1 + update_events.append( + TrackUpdateEvent( + track_id=best_track_id, + action="tentative_observed", + proposal_view_count=proposal.view_count, + proposal_support_size=proposal.support_size, + ) + ) - def _promote_tentative_tracks(self, bundle: FrameBundle) -> None: - promote_ids: list[int] = [] for track_id, track in self._tentative.items(): + if track_id not in updated_ids: + update_events.append(TrackUpdateEvent(track_id=track_id, action="tentative_missed")) + + def _promote_tentative_tracks(self, bundle: FrameBundle, update_events: list[TrackUpdateEvent]) -> None: + promote_candidates: list[TentativeTrackState] = [] + for track in self._tentative.values(): if track.age < self._config.tentative_min_age: continue if len(track.evidence_buffer) < self._config.tentative_hits_required: continue + if track.mean_view_count < self._config.tentative_min_mean_views: + continue + if track.mean_reprojection_error > self._config.tentative_max_reprojection_error_px: + continue + if _tentative_bone_cv(track) > self._config.tentative_max_bone_cv: + continue if track.score < self._config.tentative_promote_score: continue if _tentative_root_spread(track) > self._config.proposal_match_distance_m: continue - promote_ids.append(track_id) - - for track_id in promote_ids: - tentative = self._tentative.pop(track_id) - mean_pose = np.mean(np.stack(tentative.evidence_buffer, axis=0), axis=0) + promote_candidates.append(track) + promote_candidates.sort(key=_tentative_track_rank, reverse=True) + for tentative in promote_candidates: + track_id = tentative.track_id + if track_id not in self._tentative: + continue + pose_buffer = [_copy_pose3d(proposal.pose3d) for proposal in tentative.evidence_buffer] + beta = estimate_beta_from_pose_buffer(pose_buffer) + mean_pose = np.mean(np.stack(pose_buffer, axis=0), axis=0) + initial_noise = ( + max(3.0, min(45.0, tentative.mean_reprojection_error)) + if np.isfinite(tentative.mean_reprojection_error) + else 9.0 + ) + self._tentative.pop(track_id, None) self._active[track_id] = ActiveTrackState( track_id=track_id, status="active", @@ -417,48 +914,141 @@ class PoseTracker: lost_age=0, score=max(tentative.score, 1.0), last_bundle_index=bundle.bundle_index, - skeleton=self._seed_skeleton(mean_pose), - noise_scale=np.full((20,), 9.0, dtype=np.float64), + skeleton=self._seed_skeleton(mean_pose, beta=beta), + noise_scale=np.full((20,), initial_noise, dtype=np.float64), + root_velocity=np.zeros(3, dtype=np.float64), + joint_velocity=np.zeros(31, dtype=np.float64), + beta_frozen=self._config.beta_grace_frames <= 0, + beta_grace_age=0, + covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64) * (initial_noise**2), + outside_volume_age=0, + mean_reprojection_error=tentative.mean_reprojection_error, + last_update_kind="promoted", ) self._diagnostics.promotions += 1 + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="promoted", + proposal_view_count=int(round(tentative.mean_view_count)), + proposal_support_size=int(round(tentative.mean_support_size)), + ) + ) - def _reacquire_lost_tracks(self, bundle: FrameBundle, proposals: tuple[ProposalCluster, ...]) -> None: + def _proposal_reacquisition_cost(self, predicted_state: SkeletonState, proposal: ProposalCluster) -> float: + predicted_root = predicted_state.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], :3] + root_distance = float(np.linalg.norm(predicted_root - proposal.root)) + if root_distance > self._config.proposal_reacquire_root_distance_m: + return np.inf + predicted_centered_pose = _copy_root_centered_pose3d(predicted_state.pose3d) + core_distance = _core_pose_distance(predicted_centered_pose, proposal.root_centered_pose3d) + if core_distance > self._config.proposal_reacquire_core_distance_m: + return np.inf + return ( + root_distance / max(self._config.proposal_reacquire_root_distance_m, 1e-6) + + core_distance / max(self._config.proposal_reacquire_core_distance_m, 1e-6) + - 0.05 * proposal.view_count + ) + + def _reacquire_lost_tracks( + self, + bundle: FrameBundle, + predicted_states: dict[int, _PredictedTrackState], + proposals: tuple[ProposalCluster, ...], + update_events: list[TrackUpdateEvent], + ) -> None: used_indices: set[int] = set() - reactivate: list[tuple[int, int]] = [] + reactivate: list[tuple[int, int, MultiviewUpdateResult, np.ndarray, dict[str, np.ndarray]]] = [] for track_id, track in self._lost.items(): - predicted_root = track.skeleton.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], :3] + predicted = predicted_states[track_id] best_index = -1 - best_distance = self._config.proposal_match_distance_m + best_cost = np.inf for proposal_index, proposal in enumerate(proposals): if proposal_index in used_indices: continue - distance = float(np.linalg.norm(predicted_root - proposal.root)) - if distance <= best_distance: - best_distance = distance + cost = self._proposal_reacquisition_cost(predicted.skeleton, proposal) + if cost < best_cost: + best_cost = cost best_index = proposal_index - if best_index >= 0: + if best_index < 0: + continue + proposal = proposals[best_index] + seeded_state = self._seed_skeleton(proposal.pose3d, beta=track.skeleton.beta) + proposal_matches = self._proposal_support_matches(bundle, track, proposal, seeded_state) + self._diagnostics.proposal_reacquisition_attempts += 1 + if len(proposal_matches) < self._config.lost_min_views: + self._mark_proposal_compatible_lost_track( + track, + proposal, + update_events, + mean_reprojection_error=proposal.mean_reprojection_error, + ) + continue + update_result, updated_noise, updated_noise_by_view = self._refine_track_state( + track, + seeded_state, + proposal_matches, + ) + if ( + update_result.accepted_view_count >= self._config.lost_min_views + and update_result.accepted_joint_count >= self._config.lost_min_accepted_joints + ): used_indices.add(best_index) - reactivate.append((track_id, best_index)) - for track_id, proposal_index in reactivate: + reactivate.append((track_id, best_index, update_result, updated_noise, updated_noise_by_view)) + continue + self._mark_proposal_compatible_lost_track( + track, + proposal, + update_events, + accepted_view_count=update_result.accepted_view_count, + accepted_joint_count=update_result.accepted_joint_count, + mean_reprojection_error=update_result.mean_reprojection_error, + ) + for track_id, proposal_index, update_result, updated_noise, updated_noise_by_view in reactivate: if track_id not in self._lost: continue - track = self._lost.pop(track_id) + track = self._lost[track_id] proposal = proposals[proposal_index] - track.skeleton = self._seed_skeleton(proposal.pose3d, beta=track.skeleton.beta) - track.status = "active" - track.misses = 0 - track.lost_age = 0 - track.score = max(track.score, 1.0) - track.last_bundle_index = bundle.bundle_index + predicted = predicted_states[track_id] + self._apply_successful_update( + track, + predicted, + update_result, + updated_noise, + updated_noise_by_view, + bundle_index=bundle.bundle_index, + update_kind="proposal_reacquire", + ) self._active[track_id] = track + self._lost.pop(track_id, None) self._diagnostics.reacquisitions += 1 + self._diagnostics.proposal_reacquisition_updates += 1 + update_events.append( + TrackUpdateEvent( + track_id=track_id, + action="proposal_reacquire", + accepted_view_count=update_result.accepted_view_count, + accepted_joint_count=update_result.accepted_joint_count, + proposal_view_count=proposal.view_count, + proposal_support_size=proposal.support_size, + mean_reprojection_error=update_result.mean_reprojection_error, + ) + ) - def _delete_expired_tracks(self) -> None: + def _delete_expired_tracks(self, update_events: list[TrackUpdateEvent]) -> None: tentative_delete = [ track_id for track_id, track in self._tentative.items() if track.misses > self._config.tentative_max_misses ] for track_id in tentative_delete: self._tentative.pop(track_id, None) - lost_delete = [track_id for track_id, track in self._lost.items() if track.lost_age >= self._config.lost_delete_age] + update_events.append(TrackUpdateEvent(track_id=track_id, action="deleted_tentative")) + lost_delete = [ + track_id + for track_id, track in self._lost.items() + if track.lost_age >= self._config.lost_delete_age + or _covariance_trace(track.covariance) >= self._config.lost_covariance_trace_max + or track.outside_volume_age >= self._config.lost_outside_volume_frames + ] for track_id in lost_delete: self._lost.pop(track_id, None) + update_events.append(TrackUpdateEvent(track_id=track_id, action="deleted_lost")) diff --git a/tests/support/actual_test.py b/tests/support/actual_test.py index acd82d0..9de5255 100644 --- a/tests/support/actual_test.py +++ b/tests/support/actual_test.py @@ -1,3 +1,5 @@ +from collections import Counter +from dataclasses import dataclass from pathlib import Path import click @@ -8,12 +10,92 @@ from beartype import beartype from loguru import logger from pose_tracking_exp.common.normalization import infer_bbox_from_keypoints, normalize_rtmpose_body20 -from pose_tracking_exp.schema import CameraCalibration, CameraFrame, FrameBundle, PoseDetection, SceneConfig, TrackerConfig +from pose_tracking_exp.schema import ( + CameraCalibration, + CameraFrame, + FrameBundle, + PoseDetection, + SceneConfig, + TrackerConfig, + TrackerDiagnostics, + TrackedFrameResult, +) from pose_tracking_exp.tracking import PoseTracker _NOMINAL_FRAME_PERIOD_NS = 33_333_333 +@dataclass(slots=True) +class ActualTestTrackingSummary: + bundle_count: int + active_frames: int + proposal_frames: int + max_active_tracks: int + max_lost_tracks: int + update_action_counts: dict[str, int] + mean_accepted_views: float + mean_accepted_joints: float + mean_reprojection_error: float + diagnostics: TrackerDiagnostics + + +def _finite_mean(values: list[float]) -> float: + finite = [value for value in values if np.isfinite(value)] + if not finite: + return np.inf + return float(np.mean(np.asarray(finite, dtype=np.float64))) + + +@beartype +def summarize_tracking_results( + results: list[TrackedFrameResult], + diagnostics: TrackerDiagnostics, +) -> ActualTestTrackingSummary: + update_events = [event for result in results for event in result.update_events] + action_counts = Counter(event.action for event in update_events) + accepted_view_samples = [float(event.accepted_view_count) for event in update_events if event.accepted_view_count > 0] + accepted_joint_samples = [float(event.accepted_joint_count) for event in update_events if event.accepted_joint_count > 0] + reprojection_samples = [float(event.mean_reprojection_error) for event in update_events] + return ActualTestTrackingSummary( + bundle_count=len(results), + active_frames=sum(1 for result in results if result.active_tracks), + proposal_frames=sum(1 for result in results if result.proposals), + max_active_tracks=max((len(result.active_tracks) for result in results), default=0), + max_lost_tracks=max((len(result.lost_tracks) for result in results), default=0), + update_action_counts=dict(action_counts), + mean_accepted_views=_finite_mean(accepted_view_samples), + mean_accepted_joints=_finite_mean(accepted_joint_samples), + mean_reprojection_error=_finite_mean(reprojection_samples), + diagnostics=diagnostics, + ) + + +@beartype +def format_frame_summary_lines(results: list[TrackedFrameResult]) -> tuple[str, ...]: + lines: list[str] = [] + for result in results: + action_counts = Counter(event.action for event in result.update_events) + finite_reprojection_errors = [ + float(event.mean_reprojection_error) + for event in result.update_events + if np.isfinite(event.mean_reprojection_error) + ] + lines.append( + "bundle={} proposals={} active_ids={} lost_ids={} tentative_ids={} actions={} mean_event_reproj={}".format( + result.bundle_index, + len(result.proposals), + [track.track_id for track in result.active_tracks], + [track.track_id for track in result.lost_tracks], + [track.track_id for track in result.tentative_tracks], + dict(action_counts), + "{:.2f}".format(float(np.mean(np.asarray(finite_reprojection_errors, dtype=np.float64)))) + if finite_reprojection_errors + else "nan", + ) + ) + return tuple(lines) + + @beartype def load_actual_test_scene(root: Path) -> SceneConfig: # ActualTest parquet comes from the ChArUco/OpenCV side, so `rvec` / `tvec` @@ -148,6 +230,7 @@ def load_actual_test_segment_bundles( @click.option("--max-frames", type=click.IntRange(min=1)) @click.option("--min-camera-rows", default=1, type=click.IntRange(min=1), show_default=True) @click.option("--max-active-tracks", default=1, type=click.IntRange(min=1), show_default=True) +@click.option("--verbose-frames/--no-verbose-frames", default=False, show_default=True) def main( root_path: Path, segment_name: str, @@ -156,6 +239,7 @@ def main( max_frames: int | None, min_camera_rows: int, max_active_tracks: int, + verbose_frames: bool, ) -> None: logger.remove() logger.add( @@ -174,12 +258,34 @@ def main( ) tracker = PoseTracker(scene, TrackerConfig(max_active_tracks=max_active_tracks)) results = tracker.run(bundles) + summary = summarize_tracking_results(results, tracker.diagnostics_snapshot()) logger.info( - "actual_test bundles={} active_frames={} proposal_frames={}", - len(results), - sum(1 for result in results if result.active_tracks), - sum(1 for result in results if result.proposals), + "actual_test bundles={} active_frames={} proposal_frames={} max_active_tracks={} max_lost_tracks={} " + "mean_accepted_views={} mean_accepted_joints={} mean_reprojection_error={}", + summary.bundle_count, + summary.active_frames, + summary.proposal_frames, + summary.max_active_tracks, + summary.max_lost_tracks, + "{:.2f}".format(summary.mean_accepted_views) if np.isfinite(summary.mean_accepted_views) else "nan", + "{:.2f}".format(summary.mean_accepted_joints) if np.isfinite(summary.mean_accepted_joints) else "nan", + "{:.2f}".format(summary.mean_reprojection_error) if np.isfinite(summary.mean_reprojection_error) else "nan", ) + logger.info( + "actual_test actions={} promotions={} reacquisitions={} predict_only_updates={} proposal_reacquisition_attempts={} " + "proposal_compatible_lost_frames={} nonlinear_refinements={} lm_iterations={}", + summary.update_action_counts, + summary.diagnostics.promotions, + summary.diagnostics.reacquisitions, + summary.diagnostics.predict_only_updates, + summary.diagnostics.proposal_reacquisition_attempts, + summary.diagnostics.proposal_compatible_lost_frames, + summary.diagnostics.nonlinear_refinements, + summary.diagnostics.lm_iterations, + ) + if verbose_frames: + for line in format_frame_summary_lines(results): + logger.info("actual_test_frame {}", line) if __name__ == "__main__": diff --git a/tests/test_actual_test_parquet.py b/tests/test_actual_test_parquet.py index fe276c6..c17fadb 100644 --- a/tests/test_actual_test_parquet.py +++ b/tests/test_actual_test_parquet.py @@ -5,7 +5,13 @@ import pyarrow as pa import pyarrow.parquet as pq from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME -from tests.support.actual_test import load_actual_test_scene, load_actual_test_segment_bundles +from pose_tracking_exp.schema import TrackUpdateEvent, TrackerDiagnostics, TrackedFrameResult +from tests.support.actual_test import ( + format_frame_summary_lines, + load_actual_test_scene, + load_actual_test_segment_bundles, + summarize_tracking_results, +) def _write_parquet(path: Path, rows: list[dict[str, object]]) -> None: @@ -125,3 +131,58 @@ def test_load_actual_test_keeps_partial_camera_frames(tmp_path: Path) -> None: assert [view.camera_name for view in bundles[1].views] == ["5602", "5603"] assert len(bundles[1].views[0].detections) == 1 assert bundles[1].views[1].detections == () + + +def test_actual_test_summary_reports_event_counts() -> None: + results = [ + TrackedFrameResult( + bundle_index=0, + timestamp_unix_ns=0, + tentative_tracks=(), + active_tracks=(), + lost_tracks=(), + proposals=(), + update_events=( + TrackUpdateEvent( + track_id=1, + action="direct_update", + accepted_view_count=2, + accepted_joint_count=14, + mean_reprojection_error=6.0, + ), + ), + ), + TrackedFrameResult( + bundle_index=1, + timestamp_unix_ns=1, + tentative_tracks=(), + active_tracks=(), + lost_tracks=(), + proposals=(), + update_events=( + TrackUpdateEvent(track_id=1, action="predict_only"), + TrackUpdateEvent( + track_id=1, + action="proposal_compatible", + proposal_view_count=2, + proposal_support_size=3, + mean_reprojection_error=12.0, + ), + ), + ), + ] + + summary = summarize_tracking_results( + results, + TrackerDiagnostics(promotions=1, proposal_compatible_lost_frames=1), + ) + lines = format_frame_summary_lines(results) + + assert summary.bundle_count == 2 + assert summary.update_action_counts["direct_update"] == 1 + assert summary.update_action_counts["proposal_compatible"] == 1 + assert summary.mean_accepted_views == 2.0 + assert summary.mean_accepted_joints == 14.0 + assert summary.mean_reprojection_error == 9.0 + assert len(lines) == 2 + assert "proposal_compatible" in lines[1] diff --git a/tests/test_camera_conventions.py b/tests/test_camera_conventions.py index 7bd270f..0820bbf 100644 --- a/tests/test_camera_conventions.py +++ b/tests/test_camera_conventions.py @@ -105,6 +105,32 @@ def test_load_scene_file_supports_explicit_rpt_pose(tmp_path: Path) -> None: np.testing.assert_allclose(scene.cameras[0].T, [-1.0, -2.0, -3.0]) +def test_load_scene_file_defaults_imgpaths_payloads_to_rpt_pose(tmp_path: Path) -> None: + scene_path = tmp_path / "scene.json" + payload = { + "imgpaths": ["/tmp/cam0.jpg"], + "room_size": [6.0, 4.0, 3.0], + "room_center": [0.0, 0.0, 1.0], + "cameras": [ + { + "name": "cam0", + "width": 640, + "height": 480, + "K": [[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]], + "DC": [0.0, 0.0, 0.0, 0.0, 0.0], + "R": [[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]], + "T": [[1.0], [2.0], [3.0]], + } + ], + } + scene_path.write_text(json.dumps(payload), encoding="utf-8") + + scene = load_scene_file(scene_path) + + np.testing.assert_allclose(scene.cameras[0].pose_T, [1.0, 2.0, 3.0]) + np.testing.assert_allclose(scene.cameras[0].T, [-1.0, -2.0, -3.0]) + + def test_build_rpt_config_uses_pose_convention(monkeypatch: pytest.MonkeyPatch) -> None: args = _camera_args() camera = CameraCalibration.from_opencv_extrinsics( diff --git a/tests/test_tracker_single_person.py b/tests/test_tracker_single_person.py index 23a52ce..5889c90 100644 --- a/tests/test_tracker_single_person.py +++ b/tests/test_tracker_single_person.py @@ -1,13 +1,25 @@ from pathlib import Path +from types import SimpleNamespace import numpy as np import pytest pytest.importorskip("rpt") +from pose_tracking_exp.common.camera_math import project_pose from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME -from pose_tracking_exp.schema import CameraCalibration, CameraFrame, FrameBundle, ProposalCluster, SceneConfig, TrackerConfig -from pose_tracking_exp.tracking import PoseTracker +from pose_tracking_exp.schema import ( + ActiveTrackState, + CameraCalibration, + CameraFrame, + FrameBundle, + PoseDetection, + ProposalCluster, + SceneConfig, + TRACK_COVARIANCE_DIMENSION, + TrackerConfig, +) +from pose_tracking_exp.tracking import PoseTracker, seed_state_from_pose3d def _make_scene() -> SceneConfig: @@ -89,6 +101,26 @@ def _make_proposal(root_x: float, *, score: float = 1.0) -> ProposalCluster: source_views=frozenset({"cam0", "cam1"}), support_size=2, mean_score=score, + support_detection_indices={"cam0": (0,), "cam1": (0,)}, + ) + + +def _fake_detection() -> PoseDetection: + return PoseDetection( + bbox=np.asarray([0.0, 0.0, 1.0, 1.0], dtype=np.float64), + bbox_confidence=1.0, + keypoints=np.zeros((20, 3), dtype=np.float64), + ) + + +def _detection_from_projection(projected: np.ndarray, *, confidence: float = 1.0) -> PoseDetection: + keypoints = np.zeros((20, 3), dtype=np.float64) + keypoints[:, :2] = projected[:, :2] + keypoints[:, 2] = confidence + return PoseDetection( + bbox=np.asarray([0.0, 0.0, 1.0, 1.0], dtype=np.float64), + bbox_confidence=confidence, + keypoints=keypoints, ) @@ -147,6 +179,32 @@ def test_single_person_mode_reuses_lost_track_id(monkeypatch) -> None: "_build_proposals", lambda bundle, unmatched: proposals_by_bundle[bundle.bundle_index], ) + fake_detection = _fake_detection() + monkeypatch.setattr( + tracker, + "_proposal_support_matches", + lambda bundle, track, proposal, seeded_state: {"cam0": fake_detection, "cam1": fake_detection}, + ) + update_result = SimpleNamespace( + state=seed_state_from_pose3d(_make_proposal(0.05, score=0.96).pose3d), + parameter_covariance=np.eye(31, dtype=np.float64) * 0.1, + beta_covariance=np.eye(8, dtype=np.float64) * 0.01, + accepted_joint_masks={"cam0": np.ones((20,), dtype=bool), "cam1": np.ones((20,), dtype=bool)}, + accepted_joint_counts_by_view={"cam0": 20, "cam1": 20}, + accepted_joint_count=20, + accepted_view_count=2, + mean_reprojection_error=5.0, + lm_iterations=2, + ) + monkeypatch.setattr( + tracker, + "_refine_track_state", + lambda track, predicted_state, matched: ( + update_result, + np.full((20,), 9.0, dtype=np.float64), + {"cam0": np.full((20,), 9.0, dtype=np.float64), "cam1": np.full((20,), 9.0, dtype=np.float64)}, + ), + ) results = tracker.run([_make_bundle(0), _make_bundle(1), _make_bundle(2)]) @@ -154,3 +212,256 @@ def test_single_person_mode_reuses_lost_track_id(monkeypatch) -> None: assert [track.track_id for track in results[1].lost_tracks] == [1] assert [track.track_id for track in results[2].active_tracks] == [1] assert tracker.diagnostics_snapshot().reacquisitions >= 1 + + +def test_active_track_is_not_reseeded_from_proposals(monkeypatch) -> None: + tracker = PoseTracker( + _make_scene(), + TrackerConfig( + max_active_tracks=1, + tentative_min_age=1, + tentative_hits_required=1, + tentative_promote_score=0.0, + active_miss_to_lost=3, + proposal_min_score=0.5, + ), + ) + proposals_by_bundle = { + 0: (_make_proposal(0.0, score=0.95),), + 1: (_make_proposal(0.8, score=0.99),), + } + + monkeypatch.setattr( + tracker, + "_build_proposals", + lambda bundle, unmatched: proposals_by_bundle[bundle.bundle_index], + ) + + results = tracker.run([_make_bundle(0), _make_bundle(1)]) + + assert [track.track_id for track in results[1].active_tracks] == [1] + active_track = results[1].active_tracks[0] + assert active_track.last_update_kind == "predict_only" + assert abs(float(active_track.skeleton.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], 0])) < 0.2 + assert not any(event.action == "proposal_reacquire" for event in results[1].update_events) + + +def test_lost_track_deleted_by_covariance_trace() -> None: + tracker = PoseTracker(_make_scene(), TrackerConfig(max_active_tracks=1, lost_covariance_trace_max=10.0)) + proposal = _make_proposal(0.0, score=0.95) + tracker._lost[1] = ActiveTrackState( + track_id=1, + status="lost", + lost_age=1, + skeleton=seed_state_from_pose3d(proposal.pose3d), + covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64) * 1_000.0, + ) + + result = tracker.step(_make_bundle(0)) + + assert not result.lost_tracks + assert any(event.action == "deleted_lost" for event in result.update_events) + + +def test_track_beta_freezes_after_grace_update(monkeypatch) -> None: + tracker = PoseTracker(_make_scene(), TrackerConfig(max_active_tracks=1, beta_grace_frames=1)) + proposal = _make_proposal(0.0, score=0.95) + skeleton = seed_state_from_pose3d(proposal.pose3d) + tracker._active[1] = ActiveTrackState(track_id=1, status="active", skeleton=skeleton, score=1.0) + fake_detection = PoseDetection( + bbox=np.asarray([0.0, 0.0, 1.0, 1.0], dtype=np.float64), + bbox_confidence=1.0, + keypoints=np.zeros((20, 3), dtype=np.float64), + ) + + monkeypatch.setattr( + tracker, + "_match_existing_tracks", + lambda bundle, predicted: ({1: {"cam0": fake_detection, "cam1": fake_detection}}, {"cam0": [], "cam1": []}), + ) + + updated_state = seed_state_from_pose3d(proposal.pose3d, beta=np.full((8,), 1.1, dtype=np.float64)) + update_result = SimpleNamespace( + state=updated_state, + parameter_covariance=np.eye(31, dtype=np.float64) * 0.1, + beta_covariance=np.eye(8, dtype=np.float64) * 0.01, + accepted_joint_masks={"cam0": np.ones((20,), dtype=bool), "cam1": np.ones((20,), dtype=bool)}, + accepted_joint_counts_by_view={"cam0": 20, "cam1": 20}, + accepted_joint_count=20, + accepted_view_count=2, + mean_reprojection_error=4.0, + lm_iterations=1, + ) + monkeypatch.setattr( + tracker, + "_refine_track_state", + lambda track, predicted_state, matched: ( + update_result, + np.full((20,), 9.0, dtype=np.float64), + {"cam0": np.full((20,), 9.0, dtype=np.float64), "cam1": np.full((20,), 9.0, dtype=np.float64)}, + ), + ) + monkeypatch.setattr(tracker, "_build_proposals", lambda bundle, unmatched: ()) + + result = tracker.step(_make_bundle(0)) + + assert result.active_tracks[0].beta_frozen + np.testing.assert_allclose(result.active_tracks[0].skeleton.beta, np.full((8,), 1.1, dtype=np.float64)) + + +def test_active_track_demotes_to_lost_on_score_floor() -> None: + tracker = PoseTracker( + _make_scene(), + TrackerConfig(max_active_tracks=1, active_miss_to_lost=10, active_score_lost_threshold=0.0), + ) + proposal = _make_proposal(0.0, score=0.95) + tracker._active[1] = ActiveTrackState( + track_id=1, + status="active", + score=0.1, + skeleton=seed_state_from_pose3d(proposal.pose3d), + covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64), + ) + + result = tracker.step(_make_bundle(0)) + + assert not result.active_tracks + assert [track.track_id for track in result.lost_tracks] == [1] + + +def test_proposal_compatible_lost_track_stays_lost_without_enough_support(monkeypatch) -> None: + tracker = PoseTracker( + _make_scene(), + TrackerConfig(max_active_tracks=1, active_miss_to_lost=1, lost_delete_age=10), + ) + proposal = _make_proposal(0.0, score=0.95) + tracker._lost[1] = ActiveTrackState( + track_id=1, + status="lost", + lost_age=1, + score=1.0, + skeleton=seed_state_from_pose3d(proposal.pose3d), + covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64), + ) + monkeypatch.setattr(tracker, "_build_proposals", lambda bundle, unmatched: (proposal,)) + monkeypatch.setattr(tracker, "_proposal_support_matches", lambda bundle, track, proposal, seeded_state: {"cam0": _fake_detection()}) + + result = tracker.step(_make_bundle(0)) + + assert not result.active_tracks + assert [track.track_id for track in result.lost_tracks] == [1] + assert any(event.action == "proposal_compatible" for event in result.update_events) + + +def test_proposal_support_matches_search_all_view_detections() -> None: + scene = _make_scene() + tracker = PoseTracker(_make_scene(), TrackerConfig(max_active_tracks=1, lost_min_accepted_core_joints=2)) + proposal = _make_proposal(0.0, score=0.95) + track = ActiveTrackState(track_id=1, status="lost", skeleton=seed_state_from_pose3d(proposal.pose3d)) + seeded_state = seed_state_from_pose3d(proposal.pose3d) + projected_cam0 = project_pose(scene.cameras[0], seeded_state.pose3d) + projected_cam1 = project_pose(scene.cameras[1], seeded_state.pose3d) + good_cam0 = _detection_from_projection(projected_cam0) + good_cam1 = _detection_from_projection(projected_cam1) + bad_detection = _fake_detection() + bundle = FrameBundle( + bundle_index=0, + timestamp_unix_ns=0, + views=( + CameraFrame( + camera_name="cam0", + frame_index=0, + timestamp_unix_ns=0, + detections=(bad_detection, good_cam0), + source_size=(640, 480), + ), + CameraFrame( + camera_name="cam1", + frame_index=0, + timestamp_unix_ns=0, + detections=(bad_detection, good_cam1), + source_size=(640, 480), + ), + ), + ) + + matched = tracker._proposal_support_matches(bundle, track, proposal, seeded_state) + + assert matched["cam0"] is good_cam0 + assert matched["cam1"] is good_cam1 + + +def test_covariance_grows_on_predict_only_and_shrinks_on_update(monkeypatch) -> None: + tracker = PoseTracker(_make_scene(), TrackerConfig(max_active_tracks=1, active_miss_to_lost=10)) + proposal = _make_proposal(0.0, score=0.95) + tracker._active[1] = ActiveTrackState( + track_id=1, + status="active", + score=1.0, + skeleton=seed_state_from_pose3d(proposal.pose3d), + covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64), + ) + no_detection_bundle = _make_bundle(0) + predict_only_result = tracker.step(no_detection_bundle) + predict_only_cov_trace = float(np.trace(predict_only_result.active_tracks[0].covariance)) + + fake_detection = _fake_detection() + monkeypatch.setattr( + tracker, + "_match_existing_tracks", + lambda bundle, predicted: ({1: {"cam0": fake_detection, "cam1": fake_detection}}, {"cam0": [], "cam1": []}), + ) + update_result = SimpleNamespace( + state=seed_state_from_pose3d(proposal.pose3d, beta=np.ones((8,), dtype=np.float64)), + parameter_covariance=np.eye(31, dtype=np.float64) * 0.01, + beta_covariance=np.eye(8, dtype=np.float64) * 0.001, + accepted_joint_masks={"cam0": np.ones((20,), dtype=bool), "cam1": np.ones((20,), dtype=bool)}, + accepted_joint_counts_by_view={"cam0": 20, "cam1": 20}, + accepted_joint_count=20, + accepted_view_count=2, + mean_reprojection_error=3.0, + lm_iterations=1, + ) + monkeypatch.setattr( + tracker, + "_refine_track_state", + lambda track, predicted_state, matched: ( + update_result, + np.full((20,), 9.0, dtype=np.float64), + {"cam0": np.full((20,), 9.0, dtype=np.float64), "cam1": np.full((20,), 9.0, dtype=np.float64)}, + ), + ) + update_result_frame = tracker.step(_make_bundle(1)) + updated_cov_trace = float(np.trace(update_result_frame.active_tracks[0].covariance)) + + assert predict_only_cov_trace > float(TRACK_COVARIANCE_DIMENSION) + assert updated_cov_trace < predict_only_cov_trace + + +def test_proposal_compatible_lost_track_gets_score_relief(monkeypatch) -> None: + tracker = PoseTracker( + _make_scene(), + TrackerConfig( + max_active_tracks=1, + active_miss_to_lost=1, + lost_delete_age=10, + lost_score_decay=1.0, + lost_score_miss_penalty=0.5, + proposal_compatible_score_relief=0.4, + ), + ) + proposal = _make_proposal(0.0, score=0.95) + tracker._lost[1] = ActiveTrackState( + track_id=1, + status="lost", + lost_age=1, + score=1.0, + skeleton=seed_state_from_pose3d(proposal.pose3d), + covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64), + ) + monkeypatch.setattr(tracker, "_build_proposals", lambda bundle, unmatched: (proposal,)) + monkeypatch.setattr(tracker, "_proposal_support_matches", lambda bundle, track, proposal, seeded_state: {}) + + result = tracker.step(_make_bundle(0)) + + assert result.lost_tracks[0].score > 0.4