Files
OpenGait/tests/demo/test_window.py
T
crosstyan f501119d43 feat(demo): add export and silhouette visualization outputs
Add preprocess-only silhouette export and configurable result exporters so demo runs can be persisted for offline analysis and reproducible evaluation. Include optional parquet support and CLI visualization dumps while updating tests and tracking notes for the verified pipeline/debug workflow.
2026-02-27 17:16:20 +08:00

400 lines
14 KiB
Python

"""Unit tests for SilhouetteWindow and select_person functions."""
from typing import Any
from unittest.mock import MagicMock
import numpy as np
import pytest
import torch
from numpy.typing import NDArray
from opengait.demo.window import SilhouetteWindow, select_person
class TestSilhouetteWindow:
"""Tests for SilhouetteWindow class."""
def test_window_fill_and_ready_behavior(self) -> None:
"""Window should be ready only when filled to window_size."""
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
sil = np.ones((64, 44), dtype=np.float32)
# Not ready with fewer frames
for i in range(4):
window.push(sil, frame_idx=i, track_id=1)
assert not window.is_ready()
assert window.fill_level == (i + 1) / 5
# Ready at exactly window_size
window.push(sil, frame_idx=4, track_id=1)
assert window.is_ready()
assert window.fill_level == 1.0
def test_underfilled_not_ready(self) -> None:
"""Underfilled window should never report ready."""
window = SilhouetteWindow(window_size=10, stride=1, gap_threshold=5)
sil = np.ones((64, 44), dtype=np.float32)
# Push 9 frames (underfilled)
for i in range(9):
window.push(sil, frame_idx=i, track_id=1)
assert not window.is_ready()
assert window.fill_level == 0.9
# get_tensor should raise when not ready
with pytest.raises(ValueError, match="Window not ready"):
window.get_tensor()
def test_track_id_change_resets_buffer(self) -> None:
"""Changing track ID should reset the buffer."""
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
sil = np.ones((64, 44), dtype=np.float32)
# Fill window with track_id=1
for i in range(5):
window.push(sil, frame_idx=i, track_id=1)
assert window.is_ready()
assert window.current_track_id == 1
assert window.fill_level == 1.0
# Push with different track_id should reset
window.push(sil, frame_idx=5, track_id=2)
assert not window.is_ready()
assert window.fill_level == 0.2
assert window.current_track_id == 2
def test_frame_gap_reset_behavior(self) -> None:
"""Frame gap exceeding threshold should reset buffer."""
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=3)
sil = np.ones((64, 44), dtype=np.float32)
# Fill window with consecutive frames
for i in range(5):
window.push(sil, frame_idx=i, track_id=1)
assert window.is_ready()
assert window.fill_level == 1.0
# Small gap (within threshold) - no reset
window.push(sil, frame_idx=6, track_id=1) # gap = 1
assert window.is_ready()
assert window.fill_level == 1.0 # deque maintains max
# Reset and fill again
window.reset()
for i in range(5):
window.push(sil, frame_idx=i, track_id=1)
# Large gap (exceeds threshold) - should reset
window.push(sil, frame_idx=10, track_id=1) # gap = 5 > 3
assert not window.is_ready()
assert window.fill_level == 0.2
def test_get_tensor_shape(self) -> None:
"""get_tensor should return tensor of shape [1, 1, window_size, 64, 44]."""
window_size = 7
window = SilhouetteWindow(window_size=window_size, stride=1, gap_threshold=10)
# Push unique frames to verify ordering
for i in range(window_size):
sil = np.full((64, 44), fill_value=float(i), dtype=np.float32)
window.push(sil, frame_idx=i, track_id=1)
assert window.is_ready()
tensor = window.get_tensor(device="cpu")
assert isinstance(tensor, torch.Tensor)
assert tensor.shape == (1, 1, window_size, 64, 44)
assert tensor.dtype == torch.float32
# Verify content ordering: first frame should have value 0.0
assert tensor[0, 0, 0, 0, 0].item() == 0.0
# Last frame should have value window_size-1
assert tensor[0, 0, window_size - 1, 0, 0].item() == window_size - 1
def test_should_classify_stride_behavior(self) -> None:
"""should_classify should respect stride setting."""
window = SilhouetteWindow(window_size=5, stride=3, gap_threshold=10)
sil = np.ones((64, 44), dtype=np.float32)
# Fill window
for i in range(5):
window.push(sil, frame_idx=i, track_id=1)
assert window.is_ready()
# First classification should always trigger
assert window.should_classify()
# Mark as classified at frame 4
window.mark_classified()
# Not ready to classify yet (stride=3, only 0 frames passed)
window.push(sil, frame_idx=5, track_id=1)
assert not window.should_classify()
# Still not ready (only 1 frame passed)
window.push(sil, frame_idx=6, track_id=1)
assert not window.should_classify()
# Now ready (3 frames passed since last classification)
window.push(sil, frame_idx=7, track_id=1)
assert window.should_classify()
def test_should_classify_not_ready(self) -> None:
"""should_classify should return False when window not ready."""
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
sil = np.ones((64, 44), dtype=np.float32)
# Push only 3 frames (not ready)
for i in range(3):
window.push(sil, frame_idx=i, track_id=1)
assert not window.is_ready()
assert not window.should_classify()
def test_reset_clears_all_state(self) -> None:
"""reset should clear all internal state."""
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
sil = np.ones((64, 44), dtype=np.float32)
# Fill and classify
for i in range(5):
window.push(sil, frame_idx=i, track_id=1)
window.mark_classified()
assert window.is_ready()
assert window.current_track_id == 1
assert window.frame_count == 5
# Reset
window.reset()
assert not window.is_ready()
assert window.fill_level == 0.0
assert window.current_track_id is None
assert window.frame_count == 0
assert not window.should_classify()
def test_push_invalid_shape_raises(self) -> None:
"""push should raise ValueError for invalid silhouette shape."""
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
# Wrong shape
sil_wrong = np.ones((32, 32), dtype=np.float32)
with pytest.raises(ValueError, match="Expected silhouette shape"):
window.push(sil_wrong, frame_idx=0, track_id=1)
def test_push_wrong_dtype_converts(self) -> None:
"""push should convert dtype to float32."""
window = SilhouetteWindow(window_size=1, stride=1, gap_threshold=10)
# uint8 input
sil_uint8 = np.ones((64, 44), dtype=np.uint8) * 255
window.push(sil_uint8, frame_idx=0, track_id=1)
tensor = window.get_tensor()
assert tensor.dtype == torch.float32
class TestSelectPerson:
"""Tests for select_person function."""
def _create_mock_results(
self,
boxes_xyxy: NDArray[np.float32] | torch.Tensor,
masks_data: NDArray[np.float32] | torch.Tensor,
track_ids: NDArray[np.int64] | torch.Tensor | None,
) -> Any:
"""Create a mock detection results object."""
mock_boxes = MagicMock()
mock_boxes.xyxy = boxes_xyxy
mock_boxes.id = track_ids
mock_masks = MagicMock()
mock_masks.data = masks_data
mock_results = MagicMock()
mock_results.boxes = mock_boxes
mock_results.masks = mock_masks
return mock_results
def test_select_person_single_detection(self) -> None:
"""Single detection should return that person's data."""
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32) # area = 3200
masks = np.random.rand(1, 100, 100).astype(np.float32)
track_ids = np.array([42], dtype=np.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
mask, bbox, tid = result
assert mask.shape == (100, 100)
assert bbox == (10, 10, 50, 90)
assert tid == 42
def test_select_person_multi_detection_selects_largest(self) -> None:
"""Multiple detections should select the one with largest bbox area."""
# Two boxes: second one is larger
boxes = np.array(
[
[0.0, 0.0, 10.0, 10.0], # area = 100
[0.0, 0.0, 30.0, 30.0], # area = 900 (largest)
[0.0, 0.0, 20.0, 20.0], # area = 400
],
dtype=np.float32,
)
masks = np.random.rand(3, 100, 100).astype(np.float32)
track_ids = np.array([1, 2, 3], dtype=np.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
mask, bbox, tid = result
assert bbox == (0, 0, 30, 30) # Largest box
assert tid == 2 # Corresponding track ID
def test_select_person_no_detections_returns_none(self) -> None:
"""No detections should return None."""
boxes = np.array([], dtype=np.float32).reshape(0, 4)
masks = np.array([], dtype=np.float32).reshape(0, 100, 100)
track_ids = np.array([], dtype=np.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is None
def test_select_person_no_track_ids_returns_none(self) -> None:
"""Detections without track IDs should return None."""
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32)
masks = np.random.rand(1, 100, 100).astype(np.float32)
results = self._create_mock_results(boxes, masks, track_ids=None)
result = select_person(results)
assert result is None
def test_select_person_empty_track_ids_returns_none(self) -> None:
"""Empty track IDs array should return None."""
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32)
masks = np.random.rand(1, 100, 100).astype(np.float32)
track_ids = np.array([], dtype=np.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is None
def test_select_person_missing_boxes_returns_none(self) -> None:
"""Missing boxes attribute should return None."""
mock_results = MagicMock()
mock_results.boxes = None
result = select_person(mock_results)
assert result is None
def test_select_person_missing_masks_returns_none(self) -> None:
"""Missing masks attribute should return None."""
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32)
track_ids = np.array([1], dtype=np.int64)
mock_boxes = MagicMock()
mock_boxes.xyxy = boxes
mock_boxes.id = track_ids
mock_results = MagicMock()
mock_results.boxes = mock_boxes
mock_results.masks = None
result = select_person(mock_results)
assert result is None
def test_select_person_1d_bbox_handling(self) -> None:
"""1D bbox array should be reshaped to 2D."""
boxes = np.array([10.0, 10.0, 50.0, 90.0], dtype=np.float32) # 1D
masks = np.random.rand(1, 100, 100).astype(np.float32)
track_ids = np.array([1], dtype=np.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
_, bbox, tid = result
assert bbox == (10, 10, 50, 90)
assert tid == 1
def test_select_person_2d_mask_handling(self) -> None:
"""2D mask should be expanded to 3D."""
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32)
masks = np.random.rand(100, 100).astype(np.float32) # 2D
track_ids = np.array([1], dtype=np.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
mask, _, _ = result
# Should be 2D (extracted from expanded 3D)
assert mask.shape == (100, 100)
def test_select_person_tensor_cpu_inputs(self) -> None:
"""Tensor-backed inputs (CPU) should work correctly."""
boxes = torch.tensor([[10.0, 10.0, 50.0, 90.0]], dtype=torch.float32)
masks = torch.rand(1, 100, 100, dtype=torch.float32)
track_ids = torch.tensor([42], dtype=torch.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
mask, bbox, tid = result
assert mask.shape == (100, 100)
assert bbox == (10, 10, 50, 90)
assert tid == 42
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
def test_select_person_tensor_cuda_inputs(self) -> None:
"""Tensor-backed inputs (CUDA) should work correctly."""
boxes = torch.tensor([[10.0, 10.0, 50.0, 90.0]], dtype=torch.float32).cuda()
masks = torch.rand(1, 100, 100, dtype=torch.float32).cuda()
track_ids = torch.tensor([42], dtype=torch.int64).cuda()
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
mask, bbox, tid = result
assert mask.shape == (100, 100)
assert bbox == (10, 10, 50, 90)
assert tid == 42
def test_select_person_tensor_multi_detection(self) -> None:
"""Multiple tensor detections should select largest bbox."""
boxes = torch.tensor(
[
[0.0, 0.0, 10.0, 10.0], # area = 100
[0.0, 0.0, 30.0, 30.0], # area = 900 (largest)
[0.0, 0.0, 20.0, 20.0], # area = 400
],
dtype=torch.float32,
)
masks = torch.rand(3, 100, 100, dtype=torch.float32)
track_ids = torch.tensor([1, 2, 3], dtype=torch.int64)
results = self._create_mock_results(boxes, masks, track_ids)
result = select_person(results)
assert result is not None
_, bbox, tid = result
assert bbox == (0, 0, 30, 30) # Largest box
assert tid == 2 # Corresponding track ID