Files
pose_tracking_exp/tests/test_video_alignment.py
T
crosstyan 2c877dc53c feat(detection): add aligned video preparation helpers
Add a reusable video alignment module for offline multiview workflows. The helper scans per-frame timestamps, builds nearest-timestamp bundle matches under a configurable skew threshold, and rewrites synchronized per-camera videos for downstream detection and tracking runs.

The detection package now exports the alignment primitives, and a test-support CLI is included so dataset-specific experiments can generate aligned clips without expanding the public application surface.

Regression tests cover both bundle matching and frame selection during rewritten video generation.
2026-03-27 12:02:34 +08:00

98 lines
3.1 KiB
Python

from pathlib import Path
import cv2
import numpy as np
from pose_tracking_exp.detection.video_alignment import (
align_timestamp_sequences,
write_aligned_videos,
VideoScanResult,
)
def test_align_timestamp_sequences_matches_full_common_window() -> None:
scans = (
VideoScanResult(
source_name="cam0",
path=Path("/tmp/cam0.mp4"),
fps=30.0,
frame_size=(8, 6),
timestamps_unix_ns=(0, 33_000_000, 66_000_000, 99_000_000),
),
VideoScanResult(
source_name="cam1",
path=Path("/tmp/cam1.mp4"),
fps=29.97,
frame_size=(8, 6),
timestamps_unix_ns=(1_000_000, 34_000_000, 67_000_000, 100_000_000),
),
VideoScanResult(
source_name="cam2",
path=Path("/tmp/cam2.mp4"),
fps=29.5,
frame_size=(8, 6),
timestamps_unix_ns=(20_000_000, 90_000_000, 160_000_000),
),
)
bundles = align_timestamp_sequences(
scans,
reference_name="cam0",
max_skew_ns=12_000_000,
min_views=2,
)
assert len(bundles) == 4
assert bundles[0].frame_indices_by_source == {"cam0": 0, "cam1": 0}
assert bundles[-1].frame_indices_by_source == {"cam0": 3, "cam1": 3, "cam2": 1}
def _write_colored_video(path: Path, frame_values: list[int]) -> None:
writer = cv2.VideoWriter(str(path), cv2.VideoWriter.fourcc(*"mp4v"), 10.0, (8, 6))
if not writer.isOpened():
raise RuntimeError(f"Could not create {path}")
try:
for value in frame_values:
writer.write(np.full((6, 8, 3), value, dtype=np.uint8))
finally:
writer.release()
def test_write_aligned_videos_selects_requested_frames(tmp_path: Path) -> None:
source0 = tmp_path / "cam0.mp4"
source1 = tmp_path / "cam1.mp4"
_write_colored_video(source0, [10, 20, 30, 40])
_write_colored_video(source1, [11, 21, 31, 41])
scans = (
VideoScanResult("cam0", source0, 10.0, (8, 6), (0, 100_000_000, 200_000_000, 300_000_000)),
VideoScanResult("cam1", source1, 10.0, (8, 6), (0, 100_000_000, 200_000_000, 300_000_000)),
)
bundles = (
# choose original frame indices 1 and 3 from both sources
*(
bundle
for bundle in (
align_timestamp_sequences(scans, max_skew_ns=1_000_000, min_views=2)
)
if bundle.bundle_index in {1, 3}
),
)
outputs = write_aligned_videos(scans, bundles, output_dir=tmp_path / "aligned", output_fps=10.0)
for source_name, expected_values in (("cam0", [20, 40]), ("cam1", [21, 41])):
capture = cv2.VideoCapture(str(outputs[source_name]))
frames: list[int] = []
try:
while True:
success, frame = capture.read()
if not success or frame is None:
break
frames.append(int(round(float(frame.mean()))))
finally:
capture.release()
assert len(frames) == 2
assert abs(frames[0] - expected_values[0]) <= 5
assert abs(frames[1] - expected_values[1]) <= 5