2c877dc53c
Add a reusable video alignment module for offline multiview workflows. The helper scans per-frame timestamps, builds nearest-timestamp bundle matches under a configurable skew threshold, and rewrites synchronized per-camera videos for downstream detection and tracking runs. The detection package now exports the alignment primitives, and a test-support CLI is included so dataset-specific experiments can generate aligned clips without expanding the public application surface. Regression tests cover both bundle matching and frame selection during rewritten video generation.
98 lines
3.1 KiB
Python
98 lines
3.1 KiB
Python
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
from pose_tracking_exp.detection.video_alignment import (
|
|
align_timestamp_sequences,
|
|
write_aligned_videos,
|
|
VideoScanResult,
|
|
)
|
|
|
|
|
|
def test_align_timestamp_sequences_matches_full_common_window() -> None:
|
|
scans = (
|
|
VideoScanResult(
|
|
source_name="cam0",
|
|
path=Path("/tmp/cam0.mp4"),
|
|
fps=30.0,
|
|
frame_size=(8, 6),
|
|
timestamps_unix_ns=(0, 33_000_000, 66_000_000, 99_000_000),
|
|
),
|
|
VideoScanResult(
|
|
source_name="cam1",
|
|
path=Path("/tmp/cam1.mp4"),
|
|
fps=29.97,
|
|
frame_size=(8, 6),
|
|
timestamps_unix_ns=(1_000_000, 34_000_000, 67_000_000, 100_000_000),
|
|
),
|
|
VideoScanResult(
|
|
source_name="cam2",
|
|
path=Path("/tmp/cam2.mp4"),
|
|
fps=29.5,
|
|
frame_size=(8, 6),
|
|
timestamps_unix_ns=(20_000_000, 90_000_000, 160_000_000),
|
|
),
|
|
)
|
|
|
|
bundles = align_timestamp_sequences(
|
|
scans,
|
|
reference_name="cam0",
|
|
max_skew_ns=12_000_000,
|
|
min_views=2,
|
|
)
|
|
|
|
assert len(bundles) == 4
|
|
assert bundles[0].frame_indices_by_source == {"cam0": 0, "cam1": 0}
|
|
assert bundles[-1].frame_indices_by_source == {"cam0": 3, "cam1": 3, "cam2": 1}
|
|
|
|
|
|
def _write_colored_video(path: Path, frame_values: list[int]) -> None:
|
|
writer = cv2.VideoWriter(str(path), cv2.VideoWriter.fourcc(*"mp4v"), 10.0, (8, 6))
|
|
if not writer.isOpened():
|
|
raise RuntimeError(f"Could not create {path}")
|
|
try:
|
|
for value in frame_values:
|
|
writer.write(np.full((6, 8, 3), value, dtype=np.uint8))
|
|
finally:
|
|
writer.release()
|
|
|
|
|
|
def test_write_aligned_videos_selects_requested_frames(tmp_path: Path) -> None:
|
|
source0 = tmp_path / "cam0.mp4"
|
|
source1 = tmp_path / "cam1.mp4"
|
|
_write_colored_video(source0, [10, 20, 30, 40])
|
|
_write_colored_video(source1, [11, 21, 31, 41])
|
|
|
|
scans = (
|
|
VideoScanResult("cam0", source0, 10.0, (8, 6), (0, 100_000_000, 200_000_000, 300_000_000)),
|
|
VideoScanResult("cam1", source1, 10.0, (8, 6), (0, 100_000_000, 200_000_000, 300_000_000)),
|
|
)
|
|
bundles = (
|
|
# choose original frame indices 1 and 3 from both sources
|
|
*(
|
|
bundle
|
|
for bundle in (
|
|
align_timestamp_sequences(scans, max_skew_ns=1_000_000, min_views=2)
|
|
)
|
|
if bundle.bundle_index in {1, 3}
|
|
),
|
|
)
|
|
|
|
outputs = write_aligned_videos(scans, bundles, output_dir=tmp_path / "aligned", output_fps=10.0)
|
|
|
|
for source_name, expected_values in (("cam0", [20, 40]), ("cam1", [21, 41])):
|
|
capture = cv2.VideoCapture(str(outputs[source_name]))
|
|
frames: list[int] = []
|
|
try:
|
|
while True:
|
|
success, frame = capture.read()
|
|
if not success or frame is None:
|
|
break
|
|
frames.append(int(round(float(frame.mean()))))
|
|
finally:
|
|
capture.release()
|
|
assert len(frames) == 2
|
|
assert abs(frames[0] - expected_values[0]) <= 5
|
|
assert abs(frames[1] - expected_values[1]) <= 5
|