Files
pose_tracking_exp/tests/test_detection_video_parquet.py
crosstyan 2c0d51ab31 feat!: reorganize detection and tracking pipeline
Refactor the package into common, schema, detection, and tracking namespaces and move dataset-specific ActualTest utilities into tests/support.

Add a pluggable detection stack with typed protocols, pydantic-settings config, loguru-based runner logging, cvmmap and headless video sources, NATS and parquet sinks, and a structured coco-wholebody133 payload path.

Teach tracking replay loading to consume parquet detection directories directly, preserve empty frames, and keep the video-to-parquet-to-tracking workflow usable for offline E2E runs.

Vendor the local mmcv and xtcocotools wheels under Git LFS, update uv sources/lock state, and refresh the mmcv build so mmcv.ops loads successfully with the current torch+cu130 environment.
2026-03-26 16:24:27 +08:00

138 lines
4.8 KiB
Python

import json
from pathlib import Path
import anyio
import cv2
import numpy as np
import pyarrow.parquet as pq
from pose_tracking_exp.common.joints import BODY20_INDEX_BY_NAME
from pose_tracking_exp.detection.sinks import ParquetPoseSink
from pose_tracking_exp.detection.sources import VideoFrameSource
from pose_tracking_exp.schema.detection import PoseDetections
from pose_tracking_exp.tracking import load_replay_file
def _write_synthetic_video(path: Path) -> None:
writer = cv2.VideoWriter(
str(path),
cv2.VideoWriter.fourcc(*"MJPG"),
10.0,
(8, 6),
)
if not writer.isOpened():
raise RuntimeError("Could not open synthetic video writer.")
try:
for frame_index in range(3):
frame = np.full((6, 8, 3), frame_index * 32, dtype=np.uint8)
writer.write(frame)
finally:
writer.release()
def _sample_wholebody_detection(*, source_name: str, frame_index: int) -> PoseDetections:
keypoints_xy = np.zeros((1, 133, 2), dtype=np.float32)
keypoint_scores = np.zeros((1, 133), dtype=np.float32)
keypoints_xy[0, 5] = [10.0, 20.0]
keypoints_xy[0, 6] = [30.0, 20.0]
keypoints_xy[0, 11] = [12.0, 60.0]
keypoints_xy[0, 12] = [28.0, 60.0]
keypoints_xy[0, 0] = [20.0, 8.0]
keypoint_scores[0, [0, 5, 6, 11, 12]] = 1.0
return PoseDetections(
source_name=source_name,
frame_index=frame_index,
source_size=(640, 480),
boxes_xyxy=np.asarray([[8.0, 4.0, 32.0, 64.0]], dtype=np.float32),
box_scores=np.asarray([0.9], dtype=np.float32),
keypoints_xy=keypoints_xy,
keypoint_scores=keypoint_scores,
timestamp_unix_ns=frame_index * 100_000_000,
keypoint_schema="coco_wholebody133",
)
def test_video_frame_source_reads_frames(tmp_path: Path) -> None:
video_path = tmp_path / "cam0.avi"
_write_synthetic_video(video_path)
source = VideoFrameSource(video_path, source_name="cam0")
async def collect() -> list[tuple[str, int, int, tuple[int, int, int]]]:
frames: list[tuple[str, int, int, tuple[int, int, int]]] = []
async for frame in source.frames():
frames.append(
(
frame.source_name,
frame.frame_index,
frame.timestamp_unix_ns,
frame.image_bgr.shape,
)
)
return frames
frames = anyio.run(collect)
assert [item[0] for item in frames] == ["cam0", "cam0", "cam0"]
assert [item[1] for item in frames] == [0, 1, 2]
assert [item[3] for item in frames] == [(6, 8, 3), (6, 8, 3), (6, 8, 3)]
assert frames[0][2] <= frames[1][2] <= frames[2][2]
def test_parquet_sink_round_trips_into_tracking_replay(tmp_path: Path) -> None:
output_dir = tmp_path / "detections"
sink = ParquetPoseSink(output_dir, flush_rows=1)
async def write_rows() -> None:
await sink.publish_pose(_sample_wholebody_detection(source_name="cam0", frame_index=0))
await sink.publish_pose(
PoseDetections(
source_name="cam0",
frame_index=1,
source_size=(640, 480),
boxes_xyxy=np.empty((0, 4), dtype=np.float32),
box_scores=np.empty((0,), dtype=np.float32),
keypoints_xy=np.empty((0, 133, 2), dtype=np.float32),
keypoint_scores=np.empty((0, 133), dtype=np.float32),
timestamp_unix_ns=100_000_000,
keypoint_schema="coco_wholebody133",
)
)
await sink.aclose()
anyio.run(write_rows)
parquet_path = output_dir / "cam0_detected.parquet"
assert parquet_path.exists()
assert pq.read_table(parquet_path).num_rows == 2
scene_path = tmp_path / "scene.json"
scene_path.write_text(
json.dumps(
{
"room_size": [6.0, 4.0, 3.0],
"room_center": [0.0, 0.0, 1.0],
"cameras": [
{
"name": "cam0",
"width": 640,
"height": 480,
"K": [[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]],
"DC": [0.0, 0.0, 0.0, 0.0, 0.0],
"R": [[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]],
"T": [[0.0], [0.0], [0.0]],
}
],
}
),
encoding="utf-8",
)
replay = load_replay_file(scene_path, output_dir)
frames = replay.frames_by_camera["cam0"]
assert [frame.frame_index for frame in frames] == [0, 1]
assert frames[1].detections == ()
np.testing.assert_allclose(
frames[0].detections[0].keypoints[BODY20_INDEX_BY_NAME["hip_middle"], :2],
[20.0, 60.0],
)