From ebcd38eb52352c583f8fec3ed279ad2073352228 Mon Sep 17 00:00:00 2001 From: crosstyan Date: Mon, 28 Apr 2025 16:11:57 +0800 Subject: [PATCH] feat: Add camera affinity calculations and iterative processing in playground.py - Introduced `calculate_camera_affinity_matrix` to compute affinity between trackings and detections for individual cameras, enhancing modularity. - Added `process_detections_iteratively` to handle detections camera by camera, improving efficiency and scalability. - Enhanced type hints and documentation for new functions, clarifying parameters and return values. - Refactored existing affinity calculation logic to integrate new functionalities, ensuring better organization and readability. --- playground.py | 186 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 184 insertions(+), 2 deletions(-) diff --git a/playground.py b/playground.py index e8f17f3..eebcf9d 100644 --- a/playground.py +++ b/playground.py @@ -12,6 +12,8 @@ # name: python3 # --- +from collections import OrderedDict + # %% from copy import copy as shallow_copy from copy import deepcopy @@ -22,10 +24,12 @@ from pathlib import Path from typing import ( Any, Generator, + Mapping, Optional, Sequence, TypeAlias, TypedDict, + TypeVar, cast, overload, ) @@ -41,8 +45,8 @@ from IPython.display import display from jaxtyping import Array, Float, Num, jaxtyped from matplotlib import pyplot as plt from numpy.typing import ArrayLike +from scipy.optimize import linear_sum_assignment from scipy.spatial.transform import Rotation as R -from collections import OrderedDict from app.camera import ( Camera, @@ -787,6 +791,7 @@ def calculate_tracking_detection_affinity( return jnp.sum(total_affinity).item() +# %% @beartype def calculate_affinity_matrix( trackings: Sequence[Tracking], @@ -847,7 +852,7 @@ def calculate_affinity_matrix( for i, tracking in enumerate(trackings): j = 0 - for c, camera_detections in detection_by_camera.items(): + for _, camera_detections in detection_by_camera.items(): for det in camera_detections: affinity_value = calculate_tracking_detection_affinity( tracking, @@ -864,6 +869,155 @@ def calculate_affinity_matrix( return affinity, detection_by_camera +@beartype +def calculate_camera_affinity_matrix( + trackings: Sequence[Tracking], + camera_detections: Sequence[Detection], + w_2d: float, + alpha_2d: float, + w_3d: float, + alpha_3d: float, + lambda_a: float, +) -> Float[Array, "T D"]: + """ + Calculate an affinity matrix between trackings and detections from a single camera. + + This follows the iterative camera-by-camera approach from the paper + "Cross-View Tracking for Multi-Human 3D Pose Estimation at over 100 FPS". + Instead of creating one large matrix for all cameras, this creates + a separate matrix for each camera, which can be processed independently. + + Args: + trackings: Sequence of tracking objects + camera_detections: Sequence of detection objects, from the same camera + w_2d: Weight for 2D affinity + alpha_2d: Normalization factor for 2D distance + w_3d: Weight for 3D affinity + alpha_3d: Normalization factor for 3D distance + lambda_a: Decay rate for time difference + + Returns: + Affinity matrix of shape (T, D) where: + - T = number of trackings (rows) + - D = number of detections from this specific camera (columns) + + Matrix Layout: + The affinity matrix for a single camera has shape (T, D), where: + - T = number of trackings (rows) + - D = number of detections from this camera (columns) + + The matrix is organized as follows: + + ``` + | Detections from Camera c | + | d1 d2 d3 ... | + ---------+------------------------+ + Track 1 | a11 a12 a13 ... | + Track 2 | a21 a22 a23 ... | + ... | ... ... ... ... | + Track t | at1 at2 at3 ... | + ``` + + Each cell aij represents the affinity between tracking i and detection j, + computed using both 2D and 3D geometric correspondences. + """ + + def verify_all_detection_from_same_camera(detections: Sequence[Detection]): + if not detections: + return True + camera_id = next(iter(detections)).camera.id + return all(map(lambda d: d.camera.id == camera_id, detections)) + + if not verify_all_detection_from_same_camera(camera_detections): + raise ValueError("All detections must be from the same camera") + + affinity = jnp.zeros((len(trackings), len(camera_detections))) + + for i, tracking in enumerate(trackings): + for j, det in enumerate(camera_detections): + affinity_value = calculate_tracking_detection_affinity( + tracking, + det, + w_2d=w_2d, + alpha_2d=alpha_2d, + w_3d=w_3d, + alpha_3d=alpha_3d, + lambda_a=lambda_a, + ) + affinity = affinity.at[i, j].set(affinity_value) + + return affinity + + +@beartype +def process_detections_iteratively( + trackings: Sequence[Tracking], + detections: Sequence[Detection], + w_2d: float = 1.0, + alpha_2d: float = 1.0, + w_3d: float = 1.0, + alpha_3d: float = 1.0, + lambda_a: float = 0.1, +) -> list[tuple[int, Detection]]: + """ + Process detections iteratively camera by camera, matching them to trackings. + + This implements the paper's approach where each camera is processed + independently, and the affinity matrix is calculated for one camera at a time. + This approach has several advantages: + 1. Computational cost scales linearly with number of cameras + 2. Can handle non-synchronized camera frames + 3. More efficient for large-scale camera systems + + Args: + trackings: Sequence of tracking objects + detections: Sequence of detection objects + w_2d: Weight for 2D affinity + alpha_2d: Normalization factor for 2D distance + w_3d: Weight for 3D affinity + alpha_3d: Normalization factor for 3D distance + lambda_a: Decay rate for time difference + + Returns: + List of (tracking_index, detection) pairs representing matches + """ + # Group detections by camera + detection_by_camera = classify_by_camera(detections) + + # Store matches between trackings and detections + matches = [] + + # Process each camera one by one + for camera_id, camera_detections in detection_by_camera.items(): + # Calculate affinity matrix for this camera only + camera_affinity = calculate_camera_affinity_matrix( + trackings, + camera_detections, + w_2d=w_2d, + alpha_2d=alpha_2d, + w_3d=w_3d, + alpha_3d=alpha_3d, + lambda_a=lambda_a, + ) + + # Apply Hungarian algorithm for this camera only + tracking_indices, detection_indices = linear_sum_assignment( + camera_affinity, maximize=True + ) + tracking_indices = cast(Sequence[int], tracking_indices) + detection_indices = cast(Sequence[int], detection_indices) + + # Add matches to result + for t_idx, d_idx in zip(tracking_indices, detection_indices): + # Skip matches with zero or negative affinity + if camera_affinity[t_idx, d_idx] <= 0: + continue + + matches.append((t_idx, camera_detections[d_idx])) + + return matches + + # %% # let's do cross-view association W_2D = 1.0 @@ -885,3 +1039,31 @@ affinity, detection_by_camera = calculate_affinity_matrix( lambda_a=LAMBDA_A, ) display(affinity) + + +# %% +T = TypeVar("T") + + +def flatten_values( + d: Mapping[Any, Sequence[T]], +) -> list[T]: + """ + Flatten a dictionary of sequences into a single list of values. + """ + return [v for vs in d.values() for v in vs] + + +detections_sorted = flatten_values(detection_by_camera) +display(detections_sorted) +display(detection_by_camera) + +# %% +# Perform Hungarian algorithm for assignment for each camera +indices_T, indices_D = linear_sum_assignment(affinity, maximize=True) +indices_T = cast(Sequence[int], indices_T) +indices_D = cast(Sequence[int], indices_D) +display(indices_T) +display(indices_D) + +# %%