from pathlib import Path from types import SimpleNamespace import numpy as np import pytest pytest.importorskip("rpt") from pose_tracking_exp.common.camera_math import project_pose from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME from pose_tracking_exp.schema import ( ActiveTrackState, CameraCalibration, CameraFrame, FrameBundle, PoseDetection, ProposalCluster, SceneConfig, TRACK_COVARIANCE_DIMENSION, TrackerConfig, ) from pose_tracking_exp.tracking import PoseTracker, seed_state_from_pose3d def _make_scene() -> SceneConfig: cameras = ( CameraCalibration( name="cam0", width=640, height=480, K=np.asarray([[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]], dtype=np.float64), DC=np.zeros(5, dtype=np.float64), R=np.eye(3, dtype=np.float64), T=np.zeros(3, dtype=np.float64), ), CameraCalibration( name="cam1", width=640, height=480, K=np.asarray([[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]], dtype=np.float64), DC=np.zeros(5, dtype=np.float64), R=np.eye(3, dtype=np.float64), T=np.asarray([1.0, 0.0, 0.0], dtype=np.float64), ), ) return SceneConfig( room_size=np.asarray([6.0, 4.0, 3.0], dtype=np.float64), room_center=np.asarray([0.0, 0.0, 1.0], dtype=np.float64), cameras=cameras, ) def _make_bundle(bundle_index: int) -> FrameBundle: views = tuple( CameraFrame( camera_name=camera_name, frame_index=bundle_index, timestamp_unix_ns=1_000_000_000 + bundle_index * 33_000_000, detections=(), source_size=(640, 480), ) for camera_name in ("cam0", "cam1") ) return FrameBundle( bundle_index=bundle_index, timestamp_unix_ns=views[0].timestamp_unix_ns, views=views, ) def _make_proposal(root_x: float, *, score: float = 1.0) -> ProposalCluster: pose = np.zeros((20, 4), dtype=np.float64) joint_positions = { "hip_middle": [root_x, 1.0, 3.0], "hip_left": [root_x + 0.12, 1.0, 3.0], "hip_right": [root_x - 0.12, 1.0, 3.0], "shoulder_middle": [root_x, 1.52, 3.0], "shoulder_left": [root_x + 0.18, 1.52, 3.0], "shoulder_right": [root_x - 0.18, 1.52, 3.0], "elbow_left": [root_x + 0.42, 1.48, 3.02], "elbow_right": [root_x - 0.42, 1.48, 3.02], "wrist_left": [root_x + 0.64, 1.45, 3.04], "wrist_right": [root_x - 0.64, 1.45, 3.04], "knee_left": [root_x + 0.1, 0.58, 3.0], "knee_right": [root_x - 0.1, 0.58, 3.0], "ankle_left": [root_x + 0.1, 0.15, 3.02], "ankle_right": [root_x - 0.1, 0.15, 3.02], "head": [root_x, 1.82, 3.02], "nose": [root_x, 1.8, 3.06], "eye_left": [root_x + 0.03, 1.81, 3.05], "eye_right": [root_x - 0.03, 1.81, 3.05], "ear_left": [root_x + 0.06, 1.81, 3.02], "ear_right": [root_x - 0.06, 1.81, 3.02], } for name, position in joint_positions.items(): pose[BODY20_INDEX_BY_NAME[name], :3] = position pose[BODY20_INDEX_BY_NAME[name], 3] = score return ProposalCluster( pose3d=pose, root=np.asarray([root_x, 1.0, 3.0], dtype=np.float64), source_views=frozenset({"cam0", "cam1"}), support_size=2, mean_score=score, support_detection_indices={"cam0": (0,), "cam1": (0,)}, ) def _fake_detection() -> PoseDetection: return PoseDetection( bbox=np.asarray([0.0, 0.0, 1.0, 1.0], dtype=np.float64), bbox_confidence=1.0, keypoints=np.zeros((20, 3), dtype=np.float64), ) def _detection_from_projection(projected: np.ndarray, *, confidence: float = 1.0) -> PoseDetection: keypoints = np.zeros((20, 3), dtype=np.float64) keypoints[:, :2] = projected[:, :2] keypoints[:, 2] = confidence return PoseDetection( bbox=np.asarray([0.0, 0.0, 1.0, 1.0], dtype=np.float64), bbox_confidence=confidence, keypoints=keypoints, ) def test_single_person_mode_caps_active_tracks(monkeypatch) -> None: tracker = PoseTracker( _make_scene(), TrackerConfig( max_active_tracks=1, tentative_min_age=1, tentative_hits_required=1, tentative_promote_score=0.0, active_miss_to_lost=5, proposal_min_score=0.5, ), ) proposals_by_bundle = { 0: (_make_proposal(0.0, score=0.95), _make_proposal(0.9, score=0.7)), 1: (_make_proposal(0.05, score=0.96), _make_proposal(0.85, score=0.75)), } monkeypatch.setattr( tracker, "_build_proposals", lambda bundle, unmatched: proposals_by_bundle[bundle.bundle_index], ) results = tracker.run([_make_bundle(0), _make_bundle(1)]) assert len(results[0].active_tracks) == 1 assert len(results[1].active_tracks) == 1 assert not results[1].tentative_tracks assert [track.track_id for track in results[1].active_tracks] == [1] def test_single_person_mode_reuses_lost_track_id(monkeypatch) -> None: tracker = PoseTracker( _make_scene(), TrackerConfig( max_active_tracks=1, tentative_min_age=1, tentative_hits_required=1, tentative_promote_score=0.0, active_miss_to_lost=1, lost_delete_age=10, proposal_min_score=0.5, ), ) proposals_by_bundle = { 0: (_make_proposal(0.0, score=0.95),), 1: (), 2: (_make_proposal(0.05, score=0.96),), } monkeypatch.setattr( tracker, "_build_proposals", lambda bundle, unmatched: proposals_by_bundle[bundle.bundle_index], ) fake_detection = _fake_detection() monkeypatch.setattr( tracker, "_proposal_support_matches", lambda bundle, track, proposal, seeded_state: {"cam0": fake_detection, "cam1": fake_detection}, ) update_result = SimpleNamespace( state=seed_state_from_pose3d(_make_proposal(0.05, score=0.96).pose3d), parameter_covariance=np.eye(31, dtype=np.float64) * 0.1, beta_covariance=np.eye(8, dtype=np.float64) * 0.01, accepted_joint_masks={"cam0": np.ones((20,), dtype=bool), "cam1": np.ones((20,), dtype=bool)}, accepted_joint_counts_by_view={"cam0": 20, "cam1": 20}, accepted_joint_count=20, accepted_view_count=2, mean_reprojection_error=5.0, lm_iterations=2, ) monkeypatch.setattr( tracker, "_refine_track_state", lambda track, predicted_state, matched: ( update_result, np.full((20,), 9.0, dtype=np.float64), {"cam0": np.full((20,), 9.0, dtype=np.float64), "cam1": np.full((20,), 9.0, dtype=np.float64)}, ), ) results = tracker.run([_make_bundle(0), _make_bundle(1), _make_bundle(2)]) assert [track.track_id for track in results[0].active_tracks] == [1] assert [track.track_id for track in results[1].lost_tracks] == [1] assert [track.track_id for track in results[2].active_tracks] == [1] assert tracker.diagnostics_snapshot().reacquisitions >= 1 def test_active_track_is_not_reseeded_from_proposals(monkeypatch) -> None: tracker = PoseTracker( _make_scene(), TrackerConfig( max_active_tracks=1, tentative_min_age=1, tentative_hits_required=1, tentative_promote_score=0.0, active_miss_to_lost=3, proposal_min_score=0.5, ), ) proposals_by_bundle = { 0: (_make_proposal(0.0, score=0.95),), 1: (_make_proposal(0.8, score=0.99),), } monkeypatch.setattr( tracker, "_build_proposals", lambda bundle, unmatched: proposals_by_bundle[bundle.bundle_index], ) results = tracker.run([_make_bundle(0), _make_bundle(1)]) assert [track.track_id for track in results[1].active_tracks] == [1] active_track = results[1].active_tracks[0] assert active_track.last_update_kind == "predict_only" assert abs(float(active_track.skeleton.pose3d[BODY20_INDEX_BY_NAME["hip_middle"], 0])) < 0.2 assert not any(event.action == "proposal_reacquire" for event in results[1].update_events) def test_lost_track_deleted_by_covariance_trace() -> None: tracker = PoseTracker(_make_scene(), TrackerConfig(max_active_tracks=1, lost_covariance_trace_max=10.0)) proposal = _make_proposal(0.0, score=0.95) tracker._lost[1] = ActiveTrackState( track_id=1, status="lost", lost_age=1, skeleton=seed_state_from_pose3d(proposal.pose3d), covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64) * 1_000.0, ) result = tracker.step(_make_bundle(0)) assert not result.lost_tracks assert any(event.action == "deleted_lost" for event in result.update_events) def test_track_beta_freezes_after_grace_update(monkeypatch) -> None: tracker = PoseTracker(_make_scene(), TrackerConfig(max_active_tracks=1, beta_grace_frames=1)) proposal = _make_proposal(0.0, score=0.95) skeleton = seed_state_from_pose3d(proposal.pose3d) tracker._active[1] = ActiveTrackState(track_id=1, status="active", skeleton=skeleton, score=1.0) fake_detection = PoseDetection( bbox=np.asarray([0.0, 0.0, 1.0, 1.0], dtype=np.float64), bbox_confidence=1.0, keypoints=np.zeros((20, 3), dtype=np.float64), ) monkeypatch.setattr( tracker, "_match_existing_tracks", lambda bundle, predicted: ({1: {"cam0": fake_detection, "cam1": fake_detection}}, {"cam0": [], "cam1": []}), ) updated_state = seed_state_from_pose3d(proposal.pose3d, beta=np.full((8,), 1.1, dtype=np.float64)) update_result = SimpleNamespace( state=updated_state, parameter_covariance=np.eye(31, dtype=np.float64) * 0.1, beta_covariance=np.eye(8, dtype=np.float64) * 0.01, accepted_joint_masks={"cam0": np.ones((20,), dtype=bool), "cam1": np.ones((20,), dtype=bool)}, accepted_joint_counts_by_view={"cam0": 20, "cam1": 20}, accepted_joint_count=20, accepted_view_count=2, mean_reprojection_error=4.0, lm_iterations=1, ) monkeypatch.setattr( tracker, "_refine_track_state", lambda track, predicted_state, matched: ( update_result, np.full((20,), 9.0, dtype=np.float64), {"cam0": np.full((20,), 9.0, dtype=np.float64), "cam1": np.full((20,), 9.0, dtype=np.float64)}, ), ) monkeypatch.setattr(tracker, "_build_proposals", lambda bundle, unmatched: ()) result = tracker.step(_make_bundle(0)) assert result.active_tracks[0].beta_frozen np.testing.assert_allclose(result.active_tracks[0].skeleton.beta, np.full((8,), 1.1, dtype=np.float64)) def test_active_track_demotes_to_lost_on_score_floor() -> None: tracker = PoseTracker( _make_scene(), TrackerConfig(max_active_tracks=1, active_miss_to_lost=10, active_score_lost_threshold=0.0), ) proposal = _make_proposal(0.0, score=0.95) tracker._active[1] = ActiveTrackState( track_id=1, status="active", score=0.1, skeleton=seed_state_from_pose3d(proposal.pose3d), covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64), ) result = tracker.step(_make_bundle(0)) assert not result.active_tracks assert [track.track_id for track in result.lost_tracks] == [1] def test_proposal_compatible_lost_track_stays_lost_without_enough_support(monkeypatch) -> None: tracker = PoseTracker( _make_scene(), TrackerConfig(max_active_tracks=1, active_miss_to_lost=1, lost_delete_age=10), ) proposal = _make_proposal(0.0, score=0.95) tracker._lost[1] = ActiveTrackState( track_id=1, status="lost", lost_age=1, score=1.0, skeleton=seed_state_from_pose3d(proposal.pose3d), covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64), ) monkeypatch.setattr(tracker, "_build_proposals", lambda bundle, unmatched: (proposal,)) monkeypatch.setattr(tracker, "_proposal_support_matches", lambda bundle, track, proposal, seeded_state: {"cam0": _fake_detection()}) result = tracker.step(_make_bundle(0)) assert not result.active_tracks assert [track.track_id for track in result.lost_tracks] == [1] assert any(event.action == "proposal_compatible" for event in result.update_events) def test_proposal_support_matches_search_all_view_detections() -> None: scene = _make_scene() tracker = PoseTracker(_make_scene(), TrackerConfig(max_active_tracks=1, lost_min_accepted_core_joints=2)) proposal = _make_proposal(0.0, score=0.95) track = ActiveTrackState(track_id=1, status="lost", skeleton=seed_state_from_pose3d(proposal.pose3d)) seeded_state = seed_state_from_pose3d(proposal.pose3d) projected_cam0 = project_pose(scene.cameras[0], seeded_state.pose3d) projected_cam1 = project_pose(scene.cameras[1], seeded_state.pose3d) good_cam0 = _detection_from_projection(projected_cam0) good_cam1 = _detection_from_projection(projected_cam1) bad_detection = _fake_detection() bundle = FrameBundle( bundle_index=0, timestamp_unix_ns=0, views=( CameraFrame( camera_name="cam0", frame_index=0, timestamp_unix_ns=0, detections=(bad_detection, good_cam0), source_size=(640, 480), ), CameraFrame( camera_name="cam1", frame_index=0, timestamp_unix_ns=0, detections=(bad_detection, good_cam1), source_size=(640, 480), ), ), ) matched = tracker._proposal_support_matches(bundle, track, proposal, seeded_state) assert matched["cam0"] is good_cam0 assert matched["cam1"] is good_cam1 def test_covariance_grows_on_predict_only_and_shrinks_on_update(monkeypatch) -> None: tracker = PoseTracker(_make_scene(), TrackerConfig(max_active_tracks=1, active_miss_to_lost=10)) proposal = _make_proposal(0.0, score=0.95) tracker._active[1] = ActiveTrackState( track_id=1, status="active", score=1.0, skeleton=seed_state_from_pose3d(proposal.pose3d), covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64), ) no_detection_bundle = _make_bundle(0) predict_only_result = tracker.step(no_detection_bundle) predict_only_cov_trace = float(np.trace(predict_only_result.active_tracks[0].covariance)) fake_detection = _fake_detection() monkeypatch.setattr( tracker, "_match_existing_tracks", lambda bundle, predicted: ({1: {"cam0": fake_detection, "cam1": fake_detection}}, {"cam0": [], "cam1": []}), ) update_result = SimpleNamespace( state=seed_state_from_pose3d(proposal.pose3d, beta=np.ones((8,), dtype=np.float64)), parameter_covariance=np.eye(31, dtype=np.float64) * 0.01, beta_covariance=np.eye(8, dtype=np.float64) * 0.001, accepted_joint_masks={"cam0": np.ones((20,), dtype=bool), "cam1": np.ones((20,), dtype=bool)}, accepted_joint_counts_by_view={"cam0": 20, "cam1": 20}, accepted_joint_count=20, accepted_view_count=2, mean_reprojection_error=3.0, lm_iterations=1, ) monkeypatch.setattr( tracker, "_refine_track_state", lambda track, predicted_state, matched: ( update_result, np.full((20,), 9.0, dtype=np.float64), {"cam0": np.full((20,), 9.0, dtype=np.float64), "cam1": np.full((20,), 9.0, dtype=np.float64)}, ), ) update_result_frame = tracker.step(_make_bundle(1)) updated_cov_trace = float(np.trace(update_result_frame.active_tracks[0].covariance)) assert predict_only_cov_trace > float(TRACK_COVARIANCE_DIMENSION) assert updated_cov_trace < predict_only_cov_trace def test_proposal_compatible_lost_track_gets_score_relief(monkeypatch) -> None: tracker = PoseTracker( _make_scene(), TrackerConfig( max_active_tracks=1, active_miss_to_lost=1, lost_delete_age=10, lost_score_decay=1.0, lost_score_miss_penalty=0.5, proposal_compatible_score_relief=0.4, ), ) proposal = _make_proposal(0.0, score=0.95) tracker._lost[1] = ActiveTrackState( track_id=1, status="lost", lost_age=1, score=1.0, skeleton=seed_state_from_pose3d(proposal.pose3d), covariance=np.eye(TRACK_COVARIANCE_DIMENSION, dtype=np.float64), ) monkeypatch.setattr(tracker, "_build_proposals", lambda bundle, unmatched: (proposal,)) monkeypatch.setattr(tracker, "_proposal_support_matches", lambda bundle, track, proposal, seeded_state: {}) result = tracker.step(_make_bundle(0)) assert result.lost_tracks[0].score > 0.4