Files
crosstyan 2c0d51ab31 feat!: reorganize detection and tracking pipeline
Refactor the package into common, schema, detection, and tracking namespaces and move dataset-specific ActualTest utilities into tests/support.

Add a pluggable detection stack with typed protocols, pydantic-settings config, loguru-based runner logging, cvmmap and headless video sources, NATS and parquet sinks, and a structured coco-wholebody133 payload path.

Teach tracking replay loading to consume parquet detection directories directly, preserve empty frames, and keep the video-to-parquet-to-tracking workflow usable for offline E2E runs.

Vendor the local mmcv and xtcocotools wheels under Git LFS, update uv sources/lock state, and refresh the mmcv build so mmcv.ops loads successfully with the current torch+cu130 environment.
2026-03-26 16:24:27 +08:00

68 lines
2.3 KiB
Python

import json
from pathlib import Path
import numpy as np
import pytest
pytest.importorskip("rpt")
from pose_tracking_exp.schema import CameraFrame, FrameBundle, PoseDetection, TrackerConfig
from pose_tracking_exp.tracking import PoseTracker
from pose_tracking_exp.tracking.replay_io import load_scene_file
RPT_ROOT = Path("/home/crosstyan/Code/RapidPoseTriangulation")
def test_tracker_promotes_rpt_sample_person():
scene = load_scene_file(RPT_ROOT / "data/p1/sample.json")
pose_payload = json.loads((RPT_ROOT / "tests/poses_p1.json").read_text(encoding="utf-8"))
view_poses = pose_payload["2D"]
tracker = PoseTracker(
scene,
TrackerConfig(
tentative_min_age=2,
tentative_hits_required=2,
tentative_promote_score=1.2,
proposal_min_score=0.5,
),
)
bundles: list[FrameBundle] = []
for bundle_index in range(3):
views: list[CameraFrame] = []
for camera, detections in zip(scene.cameras, view_poses, strict=True):
pose_array = np.asarray(detections, dtype=np.float64)
frame_detections: list[PoseDetection] = []
for person_pose in pose_array:
mins = person_pose[:, :2].min(axis=0)
maxs = person_pose[:, :2].max(axis=0)
frame_detections.append(
PoseDetection(
bbox=np.asarray([mins[0], mins[1], maxs[0], maxs[1]], dtype=np.float64),
bbox_confidence=1.0,
keypoints=person_pose,
)
)
views.append(
CameraFrame(
camera_name=camera.name,
frame_index=bundle_index,
timestamp_unix_ns=1_000_000_000 + bundle_index * 33_000_000,
detections=tuple(frame_detections),
source_size=(camera.width, camera.height),
)
)
bundles.append(
FrameBundle(
bundle_index=bundle_index,
timestamp_unix_ns=views[0].timestamp_unix_ns,
views=tuple(views),
)
)
results = tracker.run(bundles)
assert any(result.proposals for result in results)
assert any(result.active_tracks for result in results[1:])
assert len(results[-1].active_tracks) >= 1