Files
pose_tracking_exp/tests/support/actual_test.py
T
crosstyan 2c0d51ab31 feat!: reorganize detection and tracking pipeline
Refactor the package into common, schema, detection, and tracking namespaces and move dataset-specific ActualTest utilities into tests/support.

Add a pluggable detection stack with typed protocols, pydantic-settings config, loguru-based runner logging, cvmmap and headless video sources, NATS and parquet sinks, and a structured coco-wholebody133 payload path.

Teach tracking replay loading to consume parquet detection directories directly, preserve empty frames, and keep the video-to-parquet-to-tracking workflow usable for offline E2E runs.

Vendor the local mmcv and xtcocotools wheels under Git LFS, update uv sources/lock state, and refresh the mmcv build so mmcv.ops loads successfully with the current torch+cu130 environment.
2026-03-26 16:24:27 +08:00

187 lines
7.4 KiB
Python

from pathlib import Path
import click
import cv2
import numpy as np
import pyarrow.parquet as pq
from beartype import beartype
from loguru import logger
from pose_tracking_exp.common.normalization import infer_bbox_from_keypoints, normalize_rtmpose_body20
from pose_tracking_exp.schema import CameraCalibration, CameraFrame, FrameBundle, PoseDetection, SceneConfig, TrackerConfig
from pose_tracking_exp.tracking import PoseTracker
_NOMINAL_FRAME_PERIOD_NS = 33_333_333
@beartype
def load_actual_test_scene(root: Path) -> SceneConfig:
# ActualTest parquet comes from the ChArUco/OpenCV side, so `rvec` / `tvec`
# are world->camera extrinsics. The RPT-facing camera pose is derived later
# from this canonical OpenCV form.
camera_rows = pq.read_table(root / "camera_params" / "camera_params.parquet").to_pylist()
cameras: list[CameraCalibration] = []
for item in camera_rows:
rotation, _ = cv2.Rodrigues(np.asarray(item["extrinsic"]["rvec"], dtype=np.float64).reshape(3, 1))
cameras.append(
CameraCalibration.from_opencv_extrinsics(
name=str(item["port"]),
width=int(item["resolution"]["width"]),
height=int(item["resolution"]["height"]),
K=np.asarray(item["intrinsic"]["camera_matrix"], dtype=np.float64),
DC=np.asarray(item["intrinsic"]["distortion_coefficients"], dtype=np.float64).reshape(-1),
R=np.asarray(rotation, dtype=np.float64),
T=np.asarray(item["extrinsic"]["tvec"], dtype=np.float64).reshape(3),
rvec=np.asarray(item["extrinsic"]["rvec"], dtype=np.float64).reshape(3),
)
)
return SceneConfig(
room_size=np.asarray([20.0, 20.0, 8.0], dtype=np.float64),
room_center=np.asarray([0.0, 0.0, 2.0], dtype=np.float64),
cameras=tuple(sorted(cameras, key=lambda camera: camera.name)),
)
@beartype
def load_actual_test_segment_bundles(
root: Path,
segment_name: str,
*,
frame_start: int = 690,
frame_stop: int | None = None,
max_frames: int | None = None,
min_cameras_with_rows: int = 1,
min_visible_joints: int = 6,
) -> list[FrameBundle]:
segment_root = root / segment_name
by_camera: dict[str, dict[int, tuple[PoseDetection, ...]]] = {}
for parquet_path in sorted(segment_root.glob("*_detected.parquet")):
camera_name = parquet_path.name.removesuffix("_detected.parquet")
rows = pq.read_table(parquet_path).to_pylist()
frames: dict[int, tuple[PoseDetection, ...]] = {}
for row in rows:
frame_index = int(row["frame_index"])
if frame_index < frame_start:
continue
if frame_stop is not None and frame_index >= frame_stop:
continue
detections: list[PoseDetection] = []
boxes = row["boxes"]
keypoints_batch = row["kps"]
confidence_batch = row["kps_scores"]
if not (len(boxes) == len(keypoints_batch) == len(confidence_batch)):
raise ValueError(
f"Mismatched detection arrays for camera {camera_name} frame {frame_index}: "
f"{len(boxes)=}, {len(keypoints_batch)=}, {len(confidence_batch)=}."
)
for box, keypoints_xy, confidences in zip(boxes, keypoints_batch, confidence_batch, strict=True):
keypoints_xy_array = np.asarray(keypoints_xy, dtype=np.float64)
confidences_array = np.asarray(confidences, dtype=np.float64)
pose = normalize_rtmpose_body20(keypoints_xy_array, confidences_array)
if np.count_nonzero(pose[:, 2] > 0.15) < min_visible_joints:
continue
bbox = (
np.asarray(box, dtype=np.float64)
if len(box) == 4
else infer_bbox_from_keypoints(pose)
)
visible_confidences = pose[pose[:, 2] > 0.0, 2]
detections.append(
PoseDetection(
bbox=bbox,
bbox_confidence=float(np.mean(visible_confidences)) if visible_confidences.size else 0.0,
keypoints=pose,
)
)
frames[frame_index] = tuple(detections)
by_camera[camera_name] = frames
if not by_camera:
return []
candidate_frames = sorted(set().union(*(set(frames) for frames in by_camera.values())))
if min_cameras_with_rows > 1:
candidate_frames = [
frame_index
for frame_index in candidate_frames
if sum(frame_index in frames for frames in by_camera.values()) >= min_cameras_with_rows
]
if max_frames is not None:
candidate_frames = candidate_frames[:max_frames]
scene = load_actual_test_scene(root)
camera_by_name = {camera.name: camera for camera in scene.cameras}
bundles: list[FrameBundle] = []
ordered_camera_names = [camera.name for camera in scene.cameras]
for bundle_index, frame_index in enumerate(candidate_frames):
timestamp_unix_ns = bundle_index * _NOMINAL_FRAME_PERIOD_NS
views: list[CameraFrame] = []
for camera_name in ordered_camera_names:
camera = camera_by_name[camera_name]
views.append(
CameraFrame(
camera_name=camera_name,
frame_index=frame_index,
timestamp_unix_ns=timestamp_unix_ns,
detections=by_camera.get(camera_name, {}).get(frame_index, ()),
source_size=(camera.width, camera.height),
)
)
bundles.append(
FrameBundle(
bundle_index=bundle_index,
timestamp_unix_ns=timestamp_unix_ns,
views=tuple(views),
)
)
return bundles
@click.command()
@click.argument("root_path", type=click.Path(path_type=Path, exists=True, file_okay=False))
@click.option("--segment", "segment_name", default="Segment_1", show_default=True)
@click.option("--frame-start", default=690, type=int, show_default=True)
@click.option("--frame-stop", type=int)
@click.option("--max-frames", type=click.IntRange(min=1))
@click.option("--min-camera-rows", default=1, type=click.IntRange(min=1), show_default=True)
@click.option("--max-active-tracks", default=1, type=click.IntRange(min=1), show_default=True)
def main(
root_path: Path,
segment_name: str,
frame_start: int,
frame_stop: int | None,
max_frames: int | None,
min_camera_rows: int,
max_active_tracks: int,
) -> None:
logger.remove()
logger.add(
click.get_text_stream("stderr"),
level="INFO",
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
)
scene = load_actual_test_scene(root_path)
bundles = load_actual_test_segment_bundles(
root_path,
segment_name,
frame_start=frame_start,
frame_stop=frame_stop,
max_frames=max_frames,
min_cameras_with_rows=min_camera_rows,
)
tracker = PoseTracker(scene, TrackerConfig(max_active_tracks=max_active_tracks))
results = tracker.run(bundles)
logger.info(
"actual_test bundles={} active_frames={} proposal_frames={}",
len(results),
sum(1 for result in results if result.active_tracks),
sum(1 for result in results if result.proposals),
)
if __name__ == "__main__":
main()