Files
CVTH3PE/tests/test_affinity.py
crosstyan da4c51d04f feat: Implement AffinityResult class and optimize camera affinity matrix calculation
- Added a new `AffinityResult` class to encapsulate the results of affinity computations, including the affinity matrix, trackings, and their respective indices.
- Introduced a vectorized implementation of `calculate_camera_affinity_matrix_jax` to enhance performance by leveraging JAX's capabilities, replacing the previous double-for-loop approach.
- Updated tests in `test_affinity.py` to include parameterized benchmarks for comparing the performance of the new vectorized method against the naive implementation, ensuring accuracy and efficiency.
2025-04-28 19:08:16 +08:00

225 lines
6.2 KiB
Python

from datetime import datetime, timedelta
import time
import jax.numpy as jnp
import numpy as np
import pytest
from hypothesis import given, settings, HealthCheck
from hypothesis import strategies as st
from app.camera import Camera, CameraParams
from playground import (
Detection,
Tracking,
calculate_affinity_matrix,
calculate_camera_affinity_matrix,
)
# ----------------------------------------------------------------------------
# Helper functions to generate synthetic cameras / trackings / detections
# ----------------------------------------------------------------------------
def _make_dummy_camera(cam_id: str, rng: np.random.Generator) -> Camera:
K = jnp.eye(3)
Rt = jnp.eye(4)
dist = jnp.zeros(5)
image_size = jnp.array([1000, 1000])
params = CameraParams(K=K, Rt=Rt, dist_coeffs=dist, image_size=image_size)
return Camera(id=cam_id, params=params)
def _random_keypoints_3d(rng: np.random.Generator, J: int):
return jnp.asarray(rng.uniform(-1.0, 1.0, size=(J, 3)).astype(np.float32))
def _random_keypoints_2d(rng: np.random.Generator, J: int):
return jnp.asarray(rng.uniform(0.0, 1000.0, size=(J, 2)).astype(np.float32))
def _make_trackings(rng: np.random.Generator, camera: Camera, T: int, J: int):
now = datetime.now()
trackings = []
for i in range(T):
kps3d = _random_keypoints_3d(rng, J)
trk = Tracking(
id=i + 1,
keypoints=kps3d,
last_active_timestamp=now
- timedelta(milliseconds=int(rng.integers(20, 50))),
)
trackings.append(trk)
return trackings
def _make_detections(rng: np.random.Generator, camera: Camera, D: int, J: int):
now = datetime.now()
detections = []
for _ in range(D):
kps2d = _random_keypoints_2d(rng, J)
det = Detection(
keypoints=kps2d,
confidences=jnp.ones(J, dtype=jnp.float32),
camera=camera,
timestamp=now,
)
detections.append(det)
return detections
# ----------------------------------------------------------------------------
# Property-based test: per-camera vs naive slice should match
# ----------------------------------------------------------------------------
@settings(max_examples=3, deadline=None, suppress_health_check=[HealthCheck.too_slow])
@given(
T=st.integers(min_value=1, max_value=4),
D=st.integers(min_value=1, max_value=4),
J=st.integers(min_value=5, max_value=15),
seed=st.integers(min_value=0, max_value=10000),
)
def test_per_camera_matches_naive(T, D, J, seed):
rng = np.random.default_rng(seed)
cam = _make_dummy_camera("C0", rng)
trackings = _make_trackings(rng, cam, T, J)
detections = _make_detections(rng, cam, D, J)
# Parameters
W_2D = 1.0
ALPHA_2D = 1.0
LAMBDA_A = 0.1
W_3D = 1.0
ALPHA_3D = 1.0
# Compute per-camera affinity (fast)
A_fast = calculate_camera_affinity_matrix(
trackings,
detections,
w_2d=W_2D,
alpha_2d=ALPHA_2D,
w_3d=W_3D,
alpha_3d=ALPHA_3D,
lambda_a=LAMBDA_A,
)
# Compute naive multi-camera affinity and slice out this camera
from collections import OrderedDict
det_dict = OrderedDict({"C0": detections})
A_naive, _ = calculate_affinity_matrix(
trackings,
det_dict,
w_2d=W_2D,
alpha_2d=ALPHA_2D,
w_3d=W_3D,
alpha_3d=ALPHA_3D,
lambda_a=LAMBDA_A,
)
# both fast and naive implementation gives NaN
# we need to inject real-world data
# print("A_fast")
# print(A_fast)
# print("A_naive")
# print(A_naive)
# They should be close
np.testing.assert_allclose(A_fast, np.asarray(A_naive), rtol=1e-5, atol=1e-5)
@pytest.mark.parametrize("T,D,J", [(2, 3, 10), (4, 4, 15), (6, 8, 20)])
def test_benchmark_affinity_matrix(T, D, J):
"""Compare performance between naive and fast affinity matrix calculation."""
seed = 42
rng = np.random.default_rng(seed)
cam = _make_dummy_camera("C0", rng)
trackings = _make_trackings(rng, cam, T, J)
detections = _make_detections(rng, cam, D, J)
# Parameters
w_2d = 1.0
alpha_2d = 1.0
w_3d = 1.0
alpha_3d = 1.0
lambda_a = 0.1
# Setup for naive
from collections import OrderedDict
det_dict = OrderedDict({"C0": detections})
# First run to compile
A_fast = calculate_camera_affinity_matrix(
trackings,
detections,
w_2d=w_2d,
alpha_2d=alpha_2d,
w_3d=w_3d,
alpha_3d=alpha_3d,
lambda_a=lambda_a,
)
A_naive, _ = calculate_affinity_matrix(
trackings,
det_dict,
w_2d=w_2d,
alpha_2d=alpha_2d,
w_3d=w_3d,
alpha_3d=alpha_3d,
lambda_a=lambda_a,
)
# Assert they match before timing
np.testing.assert_allclose(A_fast, np.asarray(A_naive), rtol=1e-5, atol=1e-5)
# Timing
num_runs = 3
# Time the vectorized version
start = time.perf_counter()
for _ in range(num_runs):
calculate_camera_affinity_matrix(
trackings,
detections,
w_2d=w_2d,
alpha_2d=alpha_2d,
w_3d=w_3d,
alpha_3d=alpha_3d,
lambda_a=lambda_a,
)
end = time.perf_counter()
vectorized_time = (end - start) / num_runs
# Time the naive version
start = time.perf_counter()
for _ in range(num_runs):
calculate_affinity_matrix(
trackings,
det_dict,
w_2d=w_2d,
alpha_2d=alpha_2d,
w_3d=w_3d,
alpha_3d=alpha_3d,
lambda_a=lambda_a,
)
end = time.perf_counter()
naive_time = (end - start) / num_runs
speedup = naive_time / vectorized_time
print(f"\nBenchmark T={T}, D={D}, J={J}:")
print(f" Vectorized: {vectorized_time*1000:.2f}ms per run")
print(f" Naive: {naive_time*1000:.2f}ms per run")
print(f" Speedup: {speedup:.2f}x")
# Sanity check - vectorized should be faster!
assert speedup > 1.0, "Vectorized implementation should be faster"
if __name__ == "__main__" and pytest is not None:
# python -m pytest -xvs -k test_benchmark
# pytest.main([__file__])
pytest.main(["-xvs", __file__, "-k", "test_benchmark"])