Files
OpenGait/tests/opengait_studio/test_window.py
T
crosstyan d4e2a59ad2 fix(demo): pace gait windows before buffering
Make the OpenGait-studio demo drop unpaced frames before they grow the silhouette window. Separate source-frame gap tracking from paced-frame stride tracking so runtime scheduling matches the documented demo-window-and-stride behavior.

Add regressions for paced window growth and schedule-frame stride semantics.
2026-03-14 11:31:44 +08:00

414 lines
15 KiB
Python

"""Unit tests for SilhouetteWindow and select_person functions."""
from typing import Any
from unittest.mock import MagicMock
import numpy as np
import pytest
import torch
from numpy.typing import NDArray
from opengait_studio.window import SilhouetteWindow, select_person
class TestSilhouetteWindow:
"""Tests for SilhouetteWindow class."""
def test_window_fill_and_ready_behavior(self) -> None:
"""Window should be ready only when filled to window_size."""
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
sil = np.ones((64, 44), dtype=np.float32)
# Not ready with fewer frames
for i in range(4):
window.push(sil, frame_idx=i, track_id=1)
assert not window.is_ready()
assert window.fill_level == (i + 1) / 5
# Ready at exactly window_size
window.push(sil, frame_idx=4, track_id=1)
assert window.is_ready()
assert window.fill_level == 1.0
def test_underfilled_not_ready(self) -> None:
"""Underfilled window should never report ready."""
window = SilhouetteWindow(window_size=10, stride=1, gap_threshold=5)
sil = np.ones((64, 44), dtype=np.float32)
# Push 9 frames (underfilled)
for i in range(9):
window.push(sil, frame_idx=i, track_id=1)
assert not window.is_ready()
assert window.fill_level == 0.9
# get_tensor should raise when not ready
with pytest.raises(ValueError, match="Window not ready"):
window.get_tensor()
def test_track_id_change_resets_buffer(self) -> None:
"""Changing track ID should reset the buffer."""
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
sil = np.ones((64, 44), dtype=np.float32)
# Fill window with track_id=1
for i in range(5):
window.push(sil, frame_idx=i, track_id=1)
assert window.is_ready()
assert window.current_track_id == 1
assert window.fill_level == 1.0
# Push with different track_id should reset
window.push(sil, frame_idx=5, track_id=2)
assert not window.is_ready()
assert window.fill_level == 0.2
assert window.current_track_id == 2
def test_frame_gap_reset_behavior(self) -> None:
"""Frame gap exceeding threshold should reset buffer."""
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=3)
sil = np.ones((64, 44), dtype=np.float32)
# Fill window with consecutive frames
for i in range(5):
window.push(sil, frame_idx=i, track_id=1)
assert window.is_ready()
assert window.fill_level == 1.0
# Small gap (within threshold) - no reset
window.push(sil, frame_idx=6, track_id=1) # gap = 1
assert window.is_ready()
assert window.fill_level == 1.0 # deque maintains max
# Reset and fill again
window.reset()
for i in range(5):
window.push(sil, frame_idx=i, track_id=1)
# Large gap (exceeds threshold) - should reset
window.push(sil, frame_idx=10, track_id=1) # gap = 5 > 3
assert not window.is_ready()
assert window.fill_level == 0.2
def test_get_tensor_shape(self) -> None:
"""get_tensor should return tensor of shape [1, 1, window_size, 64, 44]."""
window_size = 7
window = SilhouetteWindow(window_size=window_size, stride=1, gap_threshold=10)
# Push unique frames to verify ordering
for i in range(window_size):
sil = np.full((64, 44), fill_value=float(i), dtype=np.float32)
window.push(sil, frame_idx=i, track_id=1)
assert window.is_ready()
tensor = window.get_tensor(device="cpu")
assert isinstance(tensor, torch.Tensor)
assert tensor.shape == (1, 1, window_size, 64, 44)
assert tensor.dtype == torch.float32
# Verify content ordering: first frame should have value 0.0
assert tensor[0, 0, 0, 0, 0].item() == 0.0
# Last frame should have value window_size-1
assert tensor[0, 0, window_size - 1, 0, 0].item() == window_size - 1
def test_should_classify_stride_behavior(self) -> None:
"""should_classify should respect stride setting."""
window = SilhouetteWindow(window_size=5, stride=3, gap_threshold=10)
sil = np.ones((64, 44), dtype=np.float32)
# Fill window
for i in range(5):
window.push(sil, frame_idx=i, track_id=1)
assert window.is_ready()
# First classification should always trigger
assert window.should_classify()
# Mark as classified at frame 4
window.mark_classified()
# Not ready to classify yet (stride=3, only 0 frames passed)
window.push(sil, frame_idx=5, track_id=1)
assert not window.should_classify()
# Still not ready (only 1 frame passed)
window.push(sil, frame_idx=6, track_id=1)
assert not window.should_classify()
# Now ready (3 frames passed since last classification)
window.push(sil, frame_idx=7, track_id=1)
assert window.should_classify()
def test_should_classify_uses_schedule_frame_idx(self) -> None:
"""Stride should be measured in paced/scheduled frames, not source frames."""
window = SilhouetteWindow(window_size=2, stride=2, gap_threshold=50)
sil = np.ones((64, 44), dtype=np.float32)
window.push(sil, frame_idx=0, track_id=1, schedule_frame_idx=0)
window.push(sil, frame_idx=10, track_id=1, schedule_frame_idx=1)
assert window.should_classify()
window.mark_classified()
window.push(sil, frame_idx=20, track_id=1, schedule_frame_idx=2)
assert not window.should_classify()
def test_should_classify_not_ready(self) -> None:
"""should_classify should return False when window not ready."""
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
sil = np.ones((64, 44), dtype=np.float32)
# Push only 3 frames (not ready)
for i in range(3):
window.push(sil, frame_idx=i, track_id=1)
assert not window.is_ready()
assert not window.should_classify()
def test_reset_clears_all_state(self) -> None:
"""reset should clear all internal state."""
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
sil = np.ones((64, 44), dtype=np.float32)
# Fill and classify
for i in range(5):
window.push(sil, frame_idx=i, track_id=1)
window.mark_classified()
assert window.is_ready()
assert window.current_track_id == 1
assert window.frame_count == 5
# Reset
window.reset()
assert not window.is_ready()
assert window.fill_level == 0.0
assert window.current_track_id is None
assert window.frame_count == 0
assert not window.should_classify()
def test_push_invalid_shape_raises(self) -> None:
"""push should raise ValueError for invalid silhouette shape."""
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
# Wrong shape
sil_wrong = np.ones((32, 32), dtype=np.float32)
with pytest.raises(ValueError, match="Expected silhouette shape"):
window.push(sil_wrong, frame_idx=0, track_id=1)
def test_push_wrong_dtype_converts(self) -> None:
"""push should convert dtype to float32."""
window = SilhouetteWindow(window_size=1, stride=1, gap_threshold=10)
# uint8 input
sil_uint8 = np.ones((64, 44), dtype=np.uint8) * 255
window.push(sil_uint8, frame_idx=0, track_id=1)
tensor = window.get_tensor()
assert tensor.dtype == torch.float32
class TestSelectPerson:
"""Tests for select_person function."""
def _create_mock_results(
self,
boxes_xyxy: NDArray[np.float32] | torch.Tensor,
masks_data: NDArray[np.float32] | torch.Tensor,
track_ids: NDArray[np.int64] | torch.Tensor | None,
) -> Any:
"""Create a mock detection results object."""
mock_boxes = MagicMock()
mock_boxes.xyxy = boxes_xyxy
mock_boxes.id = track_ids
mock_masks = MagicMock()
mock_masks.data = masks_data
mock_results = MagicMock()
mock_results.boxes = mock_boxes
mock_results.masks = mock_masks
return mock_results
def test_select_person_single_detection(self) -> None:
"""Single detection should return that person's data."""
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32) # area = 3200
masks = np.random.rand(1, 100, 100).astype(np.float32)
track_ids = np.array([42], dtype=np.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
mask, bbox, _, tid = result
assert mask.shape == (100, 100)
assert bbox == (10, 10, 50, 90)
assert tid == 42
def test_select_person_multi_detection_selects_largest(self) -> None:
"""Multiple detections should select the one with largest bbox area."""
# Two boxes: second one is larger
boxes = np.array(
[
[0.0, 0.0, 10.0, 10.0], # area = 100
[0.0, 0.0, 30.0, 30.0], # area = 900 (largest)
[0.0, 0.0, 20.0, 20.0], # area = 400
],
dtype=np.float32,
)
masks = np.random.rand(3, 100, 100).astype(np.float32)
track_ids = np.array([1, 2, 3], dtype=np.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
mask, bbox, _, tid = result
assert bbox == (0, 0, 30, 30) # Largest box
assert tid == 2 # Corresponding track ID
def test_select_person_no_detections_returns_none(self) -> None:
"""No detections should return None."""
boxes = np.array([], dtype=np.float32).reshape(0, 4)
masks = np.array([], dtype=np.float32).reshape(0, 100, 100)
track_ids = np.array([], dtype=np.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is None
def test_select_person_no_track_ids_returns_none(self) -> None:
"""Detections without track IDs should return None."""
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32)
masks = np.random.rand(1, 100, 100).astype(np.float32)
results = self._create_mock_results(boxes, masks, track_ids=None)
result = select_person(results)
assert result is None
def test_select_person_empty_track_ids_returns_none(self) -> None:
"""Empty track IDs array should return None."""
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32)
masks = np.random.rand(1, 100, 100).astype(np.float32)
track_ids = np.array([], dtype=np.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is None
def test_select_person_missing_boxes_returns_none(self) -> None:
"""Missing boxes attribute should return None."""
mock_results = MagicMock()
mock_results.boxes = None
result = select_person(mock_results)
assert result is None
def test_select_person_missing_masks_returns_none(self) -> None:
"""Missing masks attribute should return None."""
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32)
track_ids = np.array([1], dtype=np.int64)
mock_boxes = MagicMock()
mock_boxes.xyxy = boxes
mock_boxes.id = track_ids
mock_results = MagicMock()
mock_results.boxes = mock_boxes
mock_results.masks = None
result = select_person(mock_results)
assert result is None
def test_select_person_1d_bbox_handling(self) -> None:
"""1D bbox array should be reshaped to 2D."""
boxes = np.array([10.0, 10.0, 50.0, 90.0], dtype=np.float32) # 1D
masks = np.random.rand(1, 100, 100).astype(np.float32)
track_ids = np.array([1], dtype=np.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
_, bbox, _, tid = result
assert bbox == (10, 10, 50, 90)
assert tid == 1
def test_select_person_2d_mask_handling(self) -> None:
"""2D mask should be expanded to 3D."""
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32)
masks = np.random.rand(100, 100).astype(np.float32) # 2D
track_ids = np.array([1], dtype=np.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
mask, _, _, _ = result
# Should be 2D (extracted from expanded 3D)
assert mask.shape == (100, 100)
def test_select_person_tensor_cpu_inputs(self) -> None:
"""Tensor-backed inputs (CPU) should work correctly."""
boxes = torch.tensor([[10.0, 10.0, 50.0, 90.0]], dtype=torch.float32)
masks = torch.rand(1, 100, 100, dtype=torch.float32)
track_ids = torch.tensor([42], dtype=torch.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
mask, bbox, _, tid = result
assert mask.shape == (100, 100)
assert bbox == (10, 10, 50, 90)
assert tid == 42
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_select_person_tensor_cuda_inputs(self) -> None:
"""Tensor-backed inputs (CUDA) should work correctly."""
boxes = torch.tensor([[10.0, 10.0, 50.0, 90.0]], dtype=torch.float32).cuda()
masks = torch.rand(1, 100, 100, dtype=torch.float32).cuda()
track_ids = torch.tensor([42], dtype=torch.int64).cuda()
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
mask, bbox, _, tid = result
assert mask.shape == (100, 100)
assert bbox == (10, 10, 50, 90)
assert tid == 42
def test_select_person_tensor_multi_detection(self) -> None:
"""Multiple tensor detections should select largest bbox."""
boxes = torch.tensor(
[
[0.0, 0.0, 10.0, 10.0], # area = 100
[0.0, 0.0, 30.0, 30.0], # area = 900 (largest)
[0.0, 0.0, 20.0, 20.0], # area = 400
],
dtype=torch.float32,
)
masks = torch.rand(3, 100, 100, dtype=torch.float32)
track_ids = torch.tensor([1, 2, 3], dtype=torch.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
_, bbox, _, tid = result
assert bbox == (0, 0, 30, 30) # Largest box
assert tid == 2 # Corresponding track ID