f501119d43
Add preprocess-only silhouette export and configurable result exporters so demo runs can be persisted for offline analysis and reproducible evaluation. Include optional parquet support and CLI visualization dumps while updating tests and tracking notes for the verified pipeline/debug workflow.
400 lines
14 KiB
Python
400 lines
14 KiB
Python
"""Unit tests for SilhouetteWindow and select_person functions."""
|
|
|
|
from typing import Any
|
|
from unittest.mock import MagicMock
|
|
|
|
import numpy as np
|
|
import pytest
|
|
import torch
|
|
from numpy.typing import NDArray
|
|
|
|
from opengait.demo.window import SilhouetteWindow, select_person
|
|
|
|
|
|
class TestSilhouetteWindow:
|
|
"""Tests for SilhouetteWindow class."""
|
|
|
|
def test_window_fill_and_ready_behavior(self) -> None:
|
|
"""Window should be ready only when filled to window_size."""
|
|
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
|
|
sil = np.ones((64, 44), dtype=np.float32)
|
|
|
|
# Not ready with fewer frames
|
|
for i in range(4):
|
|
window.push(sil, frame_idx=i, track_id=1)
|
|
assert not window.is_ready()
|
|
assert window.fill_level == (i + 1) / 5
|
|
|
|
# Ready at exactly window_size
|
|
window.push(sil, frame_idx=4, track_id=1)
|
|
assert window.is_ready()
|
|
assert window.fill_level == 1.0
|
|
|
|
def test_underfilled_not_ready(self) -> None:
|
|
"""Underfilled window should never report ready."""
|
|
window = SilhouetteWindow(window_size=10, stride=1, gap_threshold=5)
|
|
sil = np.ones((64, 44), dtype=np.float32)
|
|
|
|
# Push 9 frames (underfilled)
|
|
for i in range(9):
|
|
window.push(sil, frame_idx=i, track_id=1)
|
|
|
|
assert not window.is_ready()
|
|
assert window.fill_level == 0.9
|
|
|
|
# get_tensor should raise when not ready
|
|
with pytest.raises(ValueError, match="Window not ready"):
|
|
window.get_tensor()
|
|
|
|
def test_track_id_change_resets_buffer(self) -> None:
|
|
"""Changing track ID should reset the buffer."""
|
|
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
|
|
sil = np.ones((64, 44), dtype=np.float32)
|
|
|
|
# Fill window with track_id=1
|
|
for i in range(5):
|
|
window.push(sil, frame_idx=i, track_id=1)
|
|
|
|
assert window.is_ready()
|
|
assert window.current_track_id == 1
|
|
assert window.fill_level == 1.0
|
|
|
|
# Push with different track_id should reset
|
|
window.push(sil, frame_idx=5, track_id=2)
|
|
assert not window.is_ready()
|
|
assert window.fill_level == 0.2
|
|
assert window.current_track_id == 2
|
|
|
|
def test_frame_gap_reset_behavior(self) -> None:
|
|
"""Frame gap exceeding threshold should reset buffer."""
|
|
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=3)
|
|
sil = np.ones((64, 44), dtype=np.float32)
|
|
|
|
# Fill window with consecutive frames
|
|
for i in range(5):
|
|
window.push(sil, frame_idx=i, track_id=1)
|
|
|
|
assert window.is_ready()
|
|
assert window.fill_level == 1.0
|
|
|
|
# Small gap (within threshold) - no reset
|
|
window.push(sil, frame_idx=6, track_id=1) # gap = 1
|
|
assert window.is_ready()
|
|
assert window.fill_level == 1.0 # deque maintains max
|
|
|
|
# Reset and fill again
|
|
window.reset()
|
|
for i in range(5):
|
|
window.push(sil, frame_idx=i, track_id=1)
|
|
|
|
# Large gap (exceeds threshold) - should reset
|
|
window.push(sil, frame_idx=10, track_id=1) # gap = 5 > 3
|
|
assert not window.is_ready()
|
|
assert window.fill_level == 0.2
|
|
|
|
def test_get_tensor_shape(self) -> None:
|
|
"""get_tensor should return tensor of shape [1, 1, window_size, 64, 44]."""
|
|
window_size = 7
|
|
window = SilhouetteWindow(window_size=window_size, stride=1, gap_threshold=10)
|
|
|
|
# Push unique frames to verify ordering
|
|
for i in range(window_size):
|
|
sil = np.full((64, 44), fill_value=float(i), dtype=np.float32)
|
|
window.push(sil, frame_idx=i, track_id=1)
|
|
|
|
assert window.is_ready()
|
|
|
|
tensor = window.get_tensor(device="cpu")
|
|
|
|
assert isinstance(tensor, torch.Tensor)
|
|
assert tensor.shape == (1, 1, window_size, 64, 44)
|
|
assert tensor.dtype == torch.float32
|
|
|
|
# Verify content ordering: first frame should have value 0.0
|
|
assert tensor[0, 0, 0, 0, 0].item() == 0.0
|
|
# Last frame should have value window_size-1
|
|
assert tensor[0, 0, window_size - 1, 0, 0].item() == window_size - 1
|
|
|
|
def test_should_classify_stride_behavior(self) -> None:
|
|
"""should_classify should respect stride setting."""
|
|
window = SilhouetteWindow(window_size=5, stride=3, gap_threshold=10)
|
|
sil = np.ones((64, 44), dtype=np.float32)
|
|
|
|
# Fill window
|
|
for i in range(5):
|
|
window.push(sil, frame_idx=i, track_id=1)
|
|
|
|
assert window.is_ready()
|
|
|
|
# First classification should always trigger
|
|
assert window.should_classify()
|
|
|
|
# Mark as classified at frame 4
|
|
window.mark_classified()
|
|
|
|
# Not ready to classify yet (stride=3, only 0 frames passed)
|
|
window.push(sil, frame_idx=5, track_id=1)
|
|
assert not window.should_classify()
|
|
|
|
# Still not ready (only 1 frame passed)
|
|
window.push(sil, frame_idx=6, track_id=1)
|
|
assert not window.should_classify()
|
|
|
|
# Now ready (3 frames passed since last classification)
|
|
window.push(sil, frame_idx=7, track_id=1)
|
|
assert window.should_classify()
|
|
|
|
def test_should_classify_not_ready(self) -> None:
|
|
"""should_classify should return False when window not ready."""
|
|
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
|
|
sil = np.ones((64, 44), dtype=np.float32)
|
|
|
|
# Push only 3 frames (not ready)
|
|
for i in range(3):
|
|
window.push(sil, frame_idx=i, track_id=1)
|
|
|
|
assert not window.is_ready()
|
|
assert not window.should_classify()
|
|
|
|
def test_reset_clears_all_state(self) -> None:
|
|
"""reset should clear all internal state."""
|
|
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
|
|
sil = np.ones((64, 44), dtype=np.float32)
|
|
|
|
# Fill and classify
|
|
for i in range(5):
|
|
window.push(sil, frame_idx=i, track_id=1)
|
|
window.mark_classified()
|
|
|
|
assert window.is_ready()
|
|
assert window.current_track_id == 1
|
|
assert window.frame_count == 5
|
|
|
|
# Reset
|
|
window.reset()
|
|
|
|
assert not window.is_ready()
|
|
assert window.fill_level == 0.0
|
|
assert window.current_track_id is None
|
|
assert window.frame_count == 0
|
|
assert not window.should_classify()
|
|
|
|
def test_push_invalid_shape_raises(self) -> None:
|
|
"""push should raise ValueError for invalid silhouette shape."""
|
|
window = SilhouetteWindow(window_size=5, stride=1, gap_threshold=10)
|
|
|
|
# Wrong shape
|
|
sil_wrong = np.ones((32, 32), dtype=np.float32)
|
|
|
|
with pytest.raises(ValueError, match="Expected silhouette shape"):
|
|
window.push(sil_wrong, frame_idx=0, track_id=1)
|
|
|
|
def test_push_wrong_dtype_converts(self) -> None:
|
|
"""push should convert dtype to float32."""
|
|
window = SilhouetteWindow(window_size=1, stride=1, gap_threshold=10)
|
|
|
|
# uint8 input
|
|
sil_uint8 = np.ones((64, 44), dtype=np.uint8) * 255
|
|
window.push(sil_uint8, frame_idx=0, track_id=1)
|
|
|
|
tensor = window.get_tensor()
|
|
assert tensor.dtype == torch.float32
|
|
|
|
|
|
class TestSelectPerson:
|
|
"""Tests for select_person function."""
|
|
|
|
def _create_mock_results(
|
|
self,
|
|
boxes_xyxy: NDArray[np.float32] | torch.Tensor,
|
|
masks_data: NDArray[np.float32] | torch.Tensor,
|
|
track_ids: NDArray[np.int64] | torch.Tensor | None,
|
|
) -> Any:
|
|
"""Create a mock detection results object."""
|
|
mock_boxes = MagicMock()
|
|
mock_boxes.xyxy = boxes_xyxy
|
|
mock_boxes.id = track_ids
|
|
|
|
mock_masks = MagicMock()
|
|
mock_masks.data = masks_data
|
|
|
|
mock_results = MagicMock()
|
|
mock_results.boxes = mock_boxes
|
|
mock_results.masks = mock_masks
|
|
|
|
return mock_results
|
|
|
|
def test_select_person_single_detection(self) -> None:
|
|
"""Single detection should return that person's data."""
|
|
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32) # area = 3200
|
|
masks = np.random.rand(1, 100, 100).astype(np.float32)
|
|
track_ids = np.array([42], dtype=np.int64)
|
|
|
|
results = self._create_mock_results(boxes, masks, track_ids)
|
|
result = select_person(results)
|
|
|
|
assert result is not None
|
|
mask, bbox, tid = result
|
|
assert mask.shape == (100, 100)
|
|
assert bbox == (10, 10, 50, 90)
|
|
assert tid == 42
|
|
|
|
def test_select_person_multi_detection_selects_largest(self) -> None:
|
|
"""Multiple detections should select the one with largest bbox area."""
|
|
# Two boxes: second one is larger
|
|
boxes = np.array(
|
|
[
|
|
[0.0, 0.0, 10.0, 10.0], # area = 100
|
|
[0.0, 0.0, 30.0, 30.0], # area = 900 (largest)
|
|
[0.0, 0.0, 20.0, 20.0], # area = 400
|
|
],
|
|
dtype=np.float32,
|
|
)
|
|
masks = np.random.rand(3, 100, 100).astype(np.float32)
|
|
track_ids = np.array([1, 2, 3], dtype=np.int64)
|
|
|
|
results = self._create_mock_results(boxes, masks, track_ids)
|
|
result = select_person(results)
|
|
|
|
assert result is not None
|
|
mask, bbox, tid = result
|
|
assert bbox == (0, 0, 30, 30) # Largest box
|
|
assert tid == 2 # Corresponding track ID
|
|
|
|
def test_select_person_no_detections_returns_none(self) -> None:
|
|
"""No detections should return None."""
|
|
boxes = np.array([], dtype=np.float32).reshape(0, 4)
|
|
masks = np.array([], dtype=np.float32).reshape(0, 100, 100)
|
|
track_ids = np.array([], dtype=np.int64)
|
|
|
|
results = self._create_mock_results(boxes, masks, track_ids)
|
|
result = select_person(results)
|
|
|
|
assert result is None
|
|
|
|
def test_select_person_no_track_ids_returns_none(self) -> None:
|
|
"""Detections without track IDs should return None."""
|
|
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32)
|
|
masks = np.random.rand(1, 100, 100).astype(np.float32)
|
|
|
|
results = self._create_mock_results(boxes, masks, track_ids=None)
|
|
result = select_person(results)
|
|
|
|
assert result is None
|
|
|
|
def test_select_person_empty_track_ids_returns_none(self) -> None:
|
|
"""Empty track IDs array should return None."""
|
|
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32)
|
|
masks = np.random.rand(1, 100, 100).astype(np.float32)
|
|
track_ids = np.array([], dtype=np.int64)
|
|
|
|
results = self._create_mock_results(boxes, masks, track_ids)
|
|
result = select_person(results)
|
|
|
|
assert result is None
|
|
|
|
def test_select_person_missing_boxes_returns_none(self) -> None:
|
|
"""Missing boxes attribute should return None."""
|
|
mock_results = MagicMock()
|
|
mock_results.boxes = None
|
|
|
|
result = select_person(mock_results)
|
|
assert result is None
|
|
|
|
def test_select_person_missing_masks_returns_none(self) -> None:
|
|
"""Missing masks attribute should return None."""
|
|
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32)
|
|
track_ids = np.array([1], dtype=np.int64)
|
|
|
|
mock_boxes = MagicMock()
|
|
mock_boxes.xyxy = boxes
|
|
mock_boxes.id = track_ids
|
|
|
|
mock_results = MagicMock()
|
|
mock_results.boxes = mock_boxes
|
|
mock_results.masks = None
|
|
|
|
result = select_person(mock_results)
|
|
assert result is None
|
|
|
|
def test_select_person_1d_bbox_handling(self) -> None:
|
|
"""1D bbox array should be reshaped to 2D."""
|
|
boxes = np.array([10.0, 10.0, 50.0, 90.0], dtype=np.float32) # 1D
|
|
masks = np.random.rand(1, 100, 100).astype(np.float32)
|
|
track_ids = np.array([1], dtype=np.int64)
|
|
|
|
results = self._create_mock_results(boxes, masks, track_ids)
|
|
result = select_person(results)
|
|
|
|
assert result is not None
|
|
_, bbox, tid = result
|
|
assert bbox == (10, 10, 50, 90)
|
|
assert tid == 1
|
|
|
|
def test_select_person_2d_mask_handling(self) -> None:
|
|
"""2D mask should be expanded to 3D."""
|
|
boxes = np.array([[10.0, 10.0, 50.0, 90.0]], dtype=np.float32)
|
|
masks = np.random.rand(100, 100).astype(np.float32) # 2D
|
|
track_ids = np.array([1], dtype=np.int64)
|
|
|
|
results = self._create_mock_results(boxes, masks, track_ids)
|
|
result = select_person(results)
|
|
|
|
assert result is not None
|
|
mask, _, _ = result
|
|
# Should be 2D (extracted from expanded 3D)
|
|
assert mask.shape == (100, 100)
|
|
|
|
|
|
def test_select_person_tensor_cpu_inputs(self) -> None:
|
|
"""Tensor-backed inputs (CPU) should work correctly."""
|
|
boxes = torch.tensor([[10.0, 10.0, 50.0, 90.0]], dtype=torch.float32)
|
|
masks = torch.rand(1, 100, 100, dtype=torch.float32)
|
|
track_ids = torch.tensor([42], dtype=torch.int64)
|
|
|
|
results = self._create_mock_results(boxes, masks, track_ids)
|
|
result = select_person(results)
|
|
|
|
assert result is not None
|
|
mask, bbox, tid = result
|
|
assert mask.shape == (100, 100)
|
|
assert bbox == (10, 10, 50, 90)
|
|
assert tid == 42
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
|
def test_select_person_tensor_cuda_inputs(self) -> None:
|
|
"""Tensor-backed inputs (CUDA) should work correctly."""
|
|
boxes = torch.tensor([[10.0, 10.0, 50.0, 90.0]], dtype=torch.float32).cuda()
|
|
masks = torch.rand(1, 100, 100, dtype=torch.float32).cuda()
|
|
track_ids = torch.tensor([42], dtype=torch.int64).cuda()
|
|
|
|
results = self._create_mock_results(boxes, masks, track_ids)
|
|
result = select_person(results)
|
|
|
|
assert result is not None
|
|
mask, bbox, tid = result
|
|
assert mask.shape == (100, 100)
|
|
assert bbox == (10, 10, 50, 90)
|
|
assert tid == 42
|
|
|
|
def test_select_person_tensor_multi_detection(self) -> None:
|
|
"""Multiple tensor detections should select largest bbox."""
|
|
boxes = torch.tensor(
|
|
[
|
|
[0.0, 0.0, 10.0, 10.0], # area = 100
|
|
[0.0, 0.0, 30.0, 30.0], # area = 900 (largest)
|
|
[0.0, 0.0, 20.0, 20.0], # area = 400
|
|
],
|
|
dtype=torch.float32,
|
|
)
|
|
masks = torch.rand(3, 100, 100, dtype=torch.float32)
|
|
track_ids = torch.tensor([1, 2, 3], dtype=torch.int64)
|
|
|
|
results = self._create_mock_results(boxes, masks, track_ids)
|
|
result = select_person(results)
|
|
|
|
assert result is not None
|
|
_, bbox, tid = result
|
|
assert bbox == (0, 0, 30, 30) # Largest box
|
|
assert tid == 2 # Corresponding track ID
|