forked from HQU-gxy/CVTH3PE
- Added a new `AffinityResult` class to encapsulate the results of affinity computations, including the affinity matrix, trackings, and their respective indices. - Introduced a vectorized implementation of `calculate_camera_affinity_matrix_jax` to enhance performance by leveraging JAX's capabilities, replacing the previous double-for-loop approach. - Updated tests in `test_affinity.py` to include parameterized benchmarks for comparing the performance of the new vectorized method against the naive implementation, ensuring accuracy and efficiency.
225 lines
6.2 KiB
Python
225 lines
6.2 KiB
Python
from datetime import datetime, timedelta
|
|
import time
|
|
|
|
import jax.numpy as jnp
|
|
import numpy as np
|
|
import pytest
|
|
from hypothesis import given, settings, HealthCheck
|
|
from hypothesis import strategies as st
|
|
|
|
from app.camera import Camera, CameraParams
|
|
from playground import (
|
|
Detection,
|
|
Tracking,
|
|
calculate_affinity_matrix,
|
|
calculate_camera_affinity_matrix,
|
|
)
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Helper functions to generate synthetic cameras / trackings / detections
|
|
# ----------------------------------------------------------------------------
|
|
|
|
|
|
def _make_dummy_camera(cam_id: str, rng: np.random.Generator) -> Camera:
|
|
K = jnp.eye(3)
|
|
Rt = jnp.eye(4)
|
|
dist = jnp.zeros(5)
|
|
image_size = jnp.array([1000, 1000])
|
|
params = CameraParams(K=K, Rt=Rt, dist_coeffs=dist, image_size=image_size)
|
|
return Camera(id=cam_id, params=params)
|
|
|
|
|
|
def _random_keypoints_3d(rng: np.random.Generator, J: int):
|
|
return jnp.asarray(rng.uniform(-1.0, 1.0, size=(J, 3)).astype(np.float32))
|
|
|
|
|
|
def _random_keypoints_2d(rng: np.random.Generator, J: int):
|
|
return jnp.asarray(rng.uniform(0.0, 1000.0, size=(J, 2)).astype(np.float32))
|
|
|
|
|
|
def _make_trackings(rng: np.random.Generator, camera: Camera, T: int, J: int):
|
|
now = datetime.now()
|
|
trackings = []
|
|
for i in range(T):
|
|
kps3d = _random_keypoints_3d(rng, J)
|
|
trk = Tracking(
|
|
id=i + 1,
|
|
keypoints=kps3d,
|
|
last_active_timestamp=now
|
|
- timedelta(milliseconds=int(rng.integers(20, 50))),
|
|
)
|
|
trackings.append(trk)
|
|
return trackings
|
|
|
|
|
|
def _make_detections(rng: np.random.Generator, camera: Camera, D: int, J: int):
|
|
now = datetime.now()
|
|
detections = []
|
|
for _ in range(D):
|
|
kps2d = _random_keypoints_2d(rng, J)
|
|
det = Detection(
|
|
keypoints=kps2d,
|
|
confidences=jnp.ones(J, dtype=jnp.float32),
|
|
camera=camera,
|
|
timestamp=now,
|
|
)
|
|
detections.append(det)
|
|
return detections
|
|
|
|
|
|
# ----------------------------------------------------------------------------
|
|
# Property-based test: per-camera vs naive slice should match
|
|
# ----------------------------------------------------------------------------
|
|
|
|
|
|
@settings(max_examples=3, deadline=None, suppress_health_check=[HealthCheck.too_slow])
|
|
@given(
|
|
T=st.integers(min_value=1, max_value=4),
|
|
D=st.integers(min_value=1, max_value=4),
|
|
J=st.integers(min_value=5, max_value=15),
|
|
seed=st.integers(min_value=0, max_value=10000),
|
|
)
|
|
def test_per_camera_matches_naive(T, D, J, seed):
|
|
rng = np.random.default_rng(seed)
|
|
|
|
cam = _make_dummy_camera("C0", rng)
|
|
|
|
trackings = _make_trackings(rng, cam, T, J)
|
|
detections = _make_detections(rng, cam, D, J)
|
|
|
|
# Parameters
|
|
W_2D = 1.0
|
|
ALPHA_2D = 1.0
|
|
LAMBDA_A = 0.1
|
|
W_3D = 1.0
|
|
ALPHA_3D = 1.0
|
|
|
|
# Compute per-camera affinity (fast)
|
|
A_fast = calculate_camera_affinity_matrix(
|
|
trackings,
|
|
detections,
|
|
w_2d=W_2D,
|
|
alpha_2d=ALPHA_2D,
|
|
w_3d=W_3D,
|
|
alpha_3d=ALPHA_3D,
|
|
lambda_a=LAMBDA_A,
|
|
)
|
|
|
|
# Compute naive multi-camera affinity and slice out this camera
|
|
from collections import OrderedDict
|
|
|
|
det_dict = OrderedDict({"C0": detections})
|
|
A_naive, _ = calculate_affinity_matrix(
|
|
trackings,
|
|
det_dict,
|
|
w_2d=W_2D,
|
|
alpha_2d=ALPHA_2D,
|
|
w_3d=W_3D,
|
|
alpha_3d=ALPHA_3D,
|
|
lambda_a=LAMBDA_A,
|
|
)
|
|
# both fast and naive implementation gives NaN
|
|
# we need to inject real-world data
|
|
|
|
# print("A_fast")
|
|
# print(A_fast)
|
|
# print("A_naive")
|
|
# print(A_naive)
|
|
|
|
# They should be close
|
|
np.testing.assert_allclose(A_fast, np.asarray(A_naive), rtol=1e-5, atol=1e-5)
|
|
|
|
|
|
@pytest.mark.parametrize("T,D,J", [(2, 3, 10), (4, 4, 15), (6, 8, 20)])
|
|
def test_benchmark_affinity_matrix(T, D, J):
|
|
"""Compare performance between naive and fast affinity matrix calculation."""
|
|
seed = 42
|
|
rng = np.random.default_rng(seed)
|
|
cam = _make_dummy_camera("C0", rng)
|
|
|
|
trackings = _make_trackings(rng, cam, T, J)
|
|
detections = _make_detections(rng, cam, D, J)
|
|
|
|
# Parameters
|
|
w_2d = 1.0
|
|
alpha_2d = 1.0
|
|
w_3d = 1.0
|
|
alpha_3d = 1.0
|
|
lambda_a = 0.1
|
|
|
|
# Setup for naive
|
|
from collections import OrderedDict
|
|
|
|
det_dict = OrderedDict({"C0": detections})
|
|
|
|
# First run to compile
|
|
A_fast = calculate_camera_affinity_matrix(
|
|
trackings,
|
|
detections,
|
|
w_2d=w_2d,
|
|
alpha_2d=alpha_2d,
|
|
w_3d=w_3d,
|
|
alpha_3d=alpha_3d,
|
|
lambda_a=lambda_a,
|
|
)
|
|
A_naive, _ = calculate_affinity_matrix(
|
|
trackings,
|
|
det_dict,
|
|
w_2d=w_2d,
|
|
alpha_2d=alpha_2d,
|
|
w_3d=w_3d,
|
|
alpha_3d=alpha_3d,
|
|
lambda_a=lambda_a,
|
|
)
|
|
|
|
# Assert they match before timing
|
|
np.testing.assert_allclose(A_fast, np.asarray(A_naive), rtol=1e-5, atol=1e-5)
|
|
|
|
# Timing
|
|
num_runs = 3
|
|
|
|
# Time the vectorized version
|
|
start = time.perf_counter()
|
|
for _ in range(num_runs):
|
|
calculate_camera_affinity_matrix(
|
|
trackings,
|
|
detections,
|
|
w_2d=w_2d,
|
|
alpha_2d=alpha_2d,
|
|
w_3d=w_3d,
|
|
alpha_3d=alpha_3d,
|
|
lambda_a=lambda_a,
|
|
)
|
|
end = time.perf_counter()
|
|
vectorized_time = (end - start) / num_runs
|
|
|
|
# Time the naive version
|
|
start = time.perf_counter()
|
|
for _ in range(num_runs):
|
|
calculate_affinity_matrix(
|
|
trackings,
|
|
det_dict,
|
|
w_2d=w_2d,
|
|
alpha_2d=alpha_2d,
|
|
w_3d=w_3d,
|
|
alpha_3d=alpha_3d,
|
|
lambda_a=lambda_a,
|
|
)
|
|
end = time.perf_counter()
|
|
naive_time = (end - start) / num_runs
|
|
|
|
speedup = naive_time / vectorized_time
|
|
print(f"\nBenchmark T={T}, D={D}, J={J}:")
|
|
print(f" Vectorized: {vectorized_time*1000:.2f}ms per run")
|
|
print(f" Naive: {naive_time*1000:.2f}ms per run")
|
|
print(f" Speedup: {speedup:.2f}x")
|
|
|
|
# Sanity check - vectorized should be faster!
|
|
assert speedup > 1.0, "Vectorized implementation should be faster"
|
|
|
|
|
|
if __name__ == "__main__" and pytest is not None:
|
|
# python -m pytest -xvs -k test_benchmark
|
|
# pytest.main([__file__])
|
|
pytest.main(["-xvs", __file__, "-k", "test_benchmark"])
|