1
0
forked from HQU-gxy/CVTH3PE
This commit is contained in:
lmd
2025-07-11 15:52:46 +08:00
parent b3da8ef7b2
commit dc015b6c65
2 changed files with 157 additions and 147 deletions

View File

@ -2,3 +2,4 @@
```bash
jupytext --to py:percent <script>.ipynb
```
hjkhljkl

View File

@ -72,19 +72,19 @@ from app.visualize.whole_body import visualize_whole_body
NDArray: TypeAlias = np.ndarray
# %%
DATASET_PATH = Path("samples") / "04_02"
AK_CAMERA_DATASET: ak.Array = ak.from_parquet(DATASET_PATH / "camera_params.parquet") # type: ignore
DELTA_T_MIN = timedelta(milliseconds=1)
display(AK_CAMERA_DATASET)
DATASET_PATH = Path("samples") / "04_02" #定义数据集路径
AK_CAMERA_DATASET: ak.Array = ak.from_parquet(DATASET_PATH / "camera_params.parquet") # 从parquet文件中读取相机参数数据集
DELTA_T_MIN = timedelta(milliseconds=1) #定义最小时间间隔为1毫秒
display(AK_CAMERA_DATASET) #显示相机参数
# %%
class Resolution(TypedDict):
class Resolution(TypedDict): #定义Resonlution类型用于表述图像分辨率
width: int
height: int
class Intrinsic(TypedDict):
class Intrinsic(TypedDict): #定义Intrinsic类型用于表示相机参数
camera_matrix: Num[Array, "3 3"]
"""
K
@ -95,12 +95,12 @@ class Intrinsic(TypedDict):
"""
class Extrinsic(TypedDict):
class Extrinsic(TypedDict): #相机外参
rvec: Num[NDArray, "3"]
tvec: Num[NDArray, "3"]
class ExternalCameraParams(TypedDict):
class ExternalCameraParams(TypedDict): #外部相机参数
name: str
port: int
intrinsic: Intrinsic
@ -109,93 +109,93 @@ class ExternalCameraParams(TypedDict):
# %%
def read_dataset_by_port(port: int) -> ak.Array:
P = DATASET_PATH / f"{port}.parquet"
return ak.from_parquet(P)
def read_dataset_by_port(port: int) -> ak.Array: #定义函数根据端口号读取数据集
P = DATASET_PATH / f"{port}.parquet" #构建数据集文件路径
return ak.from_parquet(P) #从Parquet文件中读取数据集
KEYPOINT_DATASET = {
KEYPOINT_DATASET = { #构建关键点数据集字典,键为端口号,,值为对应的数据集
int(p): read_dataset_by_port(p) for p in ak.to_numpy(AK_CAMERA_DATASET["port"])
}
# %%
class KeypointDataset(TypedDict):
frame_index: int
boxes: Num[NDArray, "N 4"]
kps: Num[NDArray, "N J 2"]
kps_scores: Num[NDArray, "N J"]
class KeypointDataset(TypedDict): #用于表示关键点数据集
frame_index: int # 帧索引
boxes: Num[NDArray, "N 4"] # 边界框N个框每个框4个坐标
kps: Num[NDArray, "N J 2"] # 关键点N个对象每个对象J个关键点每个关键点2维坐标
kps_scores: Num[NDArray, "N J"] # 关键点分数N个对象每个对象J个分数
@jaxtyped(typechecker=beartype)
def to_transformation_matrix(
rvec: Num[NDArray, "3"], tvec: Num[NDArray, "3"]
@jaxtyped(typechecker=beartype) #运行时检查函数参数和返回值是否符合类型注解中的维度约束
def to_transformation_matrix( #将旋转向量和平移向量转换为4x4的变换矩阵
rvec: Num[NDArray, "3"], tvec: Num[NDArray, "3"] #输入参数
) -> Num[NDArray, "4 4"]:
res = np.eye(4)
res[:3, :3] = R.from_rotvec(rvec).as_matrix()
res[:3, 3] = tvec
res = np.eye(4) #初始化一个4x4的单位矩阵
res[:3, :3] = R.from_rotvec(rvec).as_matrix() #将旋转向量转换为旋转矩阵并赋值给左上角3x3子矩阵
res[:3, 3] = tvec #将平移向量赋值给最后一列的前三个元素
return res
@jaxtyped(typechecker=beartype)
def undistort_points(
points: Num[NDArray, "M 2"],
camera_matrix: Num[NDArray, "3 3"],
dist_coeffs: Num[NDArray, "N"],
) -> Num[NDArray, "M 2"]:
K = camera_matrix
def undistort_points( # 对图像点进行去畸变处理
points: Num[NDArray, "M 2"], #输入参数 # M个点每个点2维坐标 (x, y)
camera_matrix: Num[NDArray, "3 3"], # 3×3相机内参矩阵
dist_coeffs: Num[NDArray, "N"], # N个畸变系数
) -> Num[NDArray, "M 2"]: # 返回M个去畸变后的点坐标
K = camera_matrix # 重新赋值参数
dist = dist_coeffs
res = undistortPoints(points, K, dist, P=K) # type: ignore
return res.reshape(-1, 2)
res = undistortPoints(points, K, dist, P=K) # type: ignore #使用OpenCV 中的函数,用于对图像点进行去畸变处理
return res.reshape(-1, 2) #将输出结果重塑为 M×2 的二维数组,确保返回格式正确
def from_camera_params(camera: ExternalCameraParams) -> Camera:
def from_camera_params(camera: ExternalCameraParams) -> Camera: #将外部相机参数转换为内部 Camera 对象
rt = jnp.array(
to_transformation_matrix(
ak.to_numpy(camera["extrinsic"]["rvec"]),
to_transformation_matrix( #调用函数,将将旋转向量和平移向量组合为齐次变换矩阵
ak.to_numpy(camera["extrinsic"]["rvec"]), #数据转换为 NumPy 数组
ak.to_numpy(camera["extrinsic"]["tvec"]),
)
)
K = jnp.array(camera["intrinsic"]["camera_matrix"]).reshape(3, 3)
dist_coeffs = jnp.array(camera["intrinsic"]["distortion_coefficients"])
image_size = jnp.array(
K = jnp.array(camera["intrinsic"]["camera_matrix"]).reshape(3, 3) #从外部参数中提取相机内参矩阵,重塑为 3×3 矩阵
dist_coeffs = jnp.array(camera["intrinsic"]["distortion_coefficients"]) #提取相机的畸变系数
image_size = jnp.array( #提取图像的宽度和高度,存储为 JAX 数组
(camera["resolution"]["width"], camera["resolution"]["height"])
)
return Camera(
id=camera["name"],
params=CameraParams(
K=K,
Rt=rt,
dist_coeffs=dist_coeffs,
image_size=image_size,
params=CameraParams( #封装所有相机参数
K=K, #相机内参矩阵
Rt=rt, #相机外参矩阵(齐次变换矩阵)
dist_coeffs=dist_coeffs, #畸变系数
image_size=image_size, #图像分辨率
),
)
def preprocess_keypoint_dataset(
dataset: Sequence[KeypointDataset],
camera: Camera,
fps: float,
start_timestamp: datetime,
) -> Generator[Detection, None, None]:
frame_interval_s = 1 / fps
def preprocess_keypoint_dataset( #用于将关键点数据集KeypointDataset 序列)转换为 Detection 对象流
dataset: Sequence[KeypointDataset], # 输入:关键点数据集序列
camera: Camera, # 相机参数
fps: float, # 帧率(帧/秒)
start_timestamp: datetime, # 起始时间戳
) -> Generator[Detection, None, None]: # 输出Detection对象生成器
frame_interval_s = 1 / fps #计算每帧的时间间隔(秒)
for el in dataset:
frame_index = el["frame_index"]
frame_index = el["frame_index"] # 获取当前帧索引
timestamp = start_timestamp + timedelta(seconds=frame_index * frame_interval_s)
for kp, kp_score in zip(el["kps"], el["kps_scores"]):
yield Detection(
keypoints=jnp.array(kp),
confidences=jnp.array(kp_score),
camera=camera,
timestamp=timestamp,
keypoints=jnp.array(kp), # 关键点坐标
confidences=jnp.array(kp_score), # 关键点置信度
camera=camera, # 相机参数
timestamp=timestamp, # 时间戳
)
# %%
DetectionGenerator: TypeAlias = Generator[Detection, None, None]
DetectionGenerator: TypeAlias = Generator[Detection, None, None] #别名定义
def sync_batch_gen(gens: Sequence[DetectionGenerator], diff: timedelta):
#将多个异步的检测流按时间戳同步,生成时间上 “对齐” 的批次
def sync_batch_gen(gens: Sequence[DetectionGenerator], diff: timedelta): #gens: 检测生成器列表diff: 允许的时间戳最大差异,用于判断两个检测是否属于同一批次
"""
given a list of detection generators, return a generator that yields a batch of detections
@ -203,13 +203,13 @@ def sync_batch_gen(gens: Sequence[DetectionGenerator], diff: timedelta):
gens: list of detection generators
diff: maximum timestamp difference between detections to consider them part of the same batch
"""
N = len(gens)
last_batch_timestamp: Optional[datetime] = None
next_batch_timestamp: Optional[datetime] = None
current_batch: list[Detection] = []
next_batch: list[Detection] = []
paused: list[bool] = [False] * N
finished: list[bool] = [False] * N
N = len(gens) # 生成器数量
last_batch_timestamp: Optional[datetime] = None # 当前批次的时间戳
next_batch_timestamp: Optional[datetime] = None # 下一批次的时间戳
current_batch: list[Detection] = [] # 当前批次的检测结果
next_batch: list[Detection] = [] # 下一批次的检测结果
paused: list[bool] = [False] * N # 标记每个生成器是否暂停
finished: list[bool] = [False] * N # 标记每个生成器是否已耗尽
def reset_paused():
"""
@ -223,56 +223,56 @@ def sync_batch_gen(gens: Sequence[DetectionGenerator], diff: timedelta):
EPS = 1e-6
# a small epsilon to avoid floating point precision issues
diff_esp = diff - timedelta(seconds=EPS)
diff_esp = diff - timedelta(seconds=EPS) #用于处理浮点数精度问题,避免因微小时间差导致误判。
while True:
for i, gen in enumerate(gens):
try:
if finished[i] or paused[i]:
continue
val = next(gen)
if last_batch_timestamp is None:
val = next(gen) # 获取下一个检测结果
if last_batch_timestamp is None: # ... 时间戳比较与批次分配 ...
last_batch_timestamp = val.timestamp
current_batch.append(val)
current_batch.append(val) # 初始化第一批
else:
if abs(val.timestamp - last_batch_timestamp) >= diff_esp:
next_batch.append(val)
next_batch.append(val) # 时间差超过阈值,放入下一批
if next_batch_timestamp is None:
next_batch_timestamp = val.timestamp
paused[i] = True
paused[i] = True # 暂停该生成器,等待批次切换
if all(paused):
yield current_batch
yield current_batch # 所有生成器都暂停时,输出当前批次
current_batch = next_batch
next_batch = []
last_batch_timestamp = next_batch_timestamp
next_batch_timestamp = None
reset_paused()
reset_paused() # 重置暂停状态
else:
current_batch.append(val)
current_batch.append(val) # 时间差在阈值内,加入当前批次
except StopIteration:
finished[i] = True
paused[i] = True
if all(finished):
if len(current_batch) > 0:
# All generators exhausted, flush remaining batch and exit
yield current_batch
yield current_batch # 输出最后一批
break
# %%
@overload
def to_projection_matrix(
def to_projection_matrix( #将 变换矩阵4×4 和 相机内参矩阵3×3 组合成一个 投影矩阵3×4
transformation_matrix: Num[NDArray, "4 4"], camera_matrix: Num[NDArray, "3 3"]
) -> Num[NDArray, "3 4"]: ...
@overload
def to_projection_matrix(
def to_projection_matrix( #将 变换矩阵4×4 和 相机内参矩阵3×3 组合成一个 投影矩阵3×4
transformation_matrix: Num[Array, "4 4"], camera_matrix: Num[Array, "3 3"]
) -> Num[Array, "3 4"]: ...
@jaxtyped(typechecker=beartype)
def to_projection_matrix(
def to_projection_matrix( #计算投影矩阵使用jax.jit提高性能
transformation_matrix: Num[Any, "4 4"],
camera_matrix: Num[Any, "3 3"],
) -> Num[Any, "3 4"]:
@ -283,28 +283,29 @@ to_projection_matrix_jit = jax.jit(to_projection_matrix)
@jaxtyped(typechecker=beartype)
def dlt(
H1: Num[NDArray, "3 4"],
H2: Num[NDArray, "3 4"],
p1: Num[NDArray, "2"],
p2: Num[NDArray, "2"],
) -> Num[NDArray, "3"]:
def dlt( # DLT算法
H1: Num[NDArray, "3 4"], # 第一个相机的投影矩阵3×4
H2: Num[NDArray, "3 4"], # 第二个相机的投影矩阵3×4
p1: Num[NDArray, "2"], # 三维点在第一个相机图像上的投影u1, v1
p2: Num[NDArray, "2"], # 三维点在第二个相机图像上的投影u2, v2
) -> Num[NDArray, "3"]: # 输出三维空间点坐标X, Y, Z
"""
Direct Linear Transformation
"""
A = [
p1[1] * H1[2, :] - H1[1, :],
H1[0, :] - p1[0] * H1[2, :],
p2[1] * H2[2, :] - H2[1, :],
H2[0, :] - p2[0] * H2[2, :],
A = [ # 构建矩阵A
p1[1] * H1[2, :] - H1[1, :], # 第一行v₁·H1[2,:] - H1[1,:]
H1[0, :] - p1[0] * H1[2, :], # 第二行H1[0,:] - u₁·H1[2,:]
p2[1] * H2[2, :] - H2[1, :], # 第三行v₂·H2[2,:] - H2[1,:]
H2[0, :] - p2[0] * H2[2, :], # 第四行H2[0,:] - u₂·H2[2,:]
]
A = np.array(A).reshape((4, 4))
A = np.array(A).reshape((4, 4)) # 转换为4×4矩阵
B = A.transpose() @ A
# 求解超定方程组
B = A.transpose() @ A # 计算A^T·A4×4矩阵
from scipy import linalg
U, s, Vh = linalg.svd(B, full_matrices=False)
return Vh[3, 0:3] / Vh[3, 3]
U, s, Vh = linalg.svd(B, full_matrices=False) # SVD分解
return Vh[3, 0:3] / Vh[3, 3] # 提取解并归一化
@overload
@ -316,7 +317,7 @@ def homogeneous_to_euclidean(points: Num[Array, "N 4"]) -> Num[Array, "N 3"]: ..
@jaxtyped(typechecker=beartype)
def homogeneous_to_euclidean(
def homogeneous_to_euclidean( #将 齐次坐标 转换为 欧几里得坐标
points: Num[Any, "N 4"],
) -> Num[Any, "N 3"]:
"""
@ -331,25 +332,31 @@ def homogeneous_to_euclidean(
return points[..., :-1] / points[..., -1:]
# %%
FPS = 24
# %% # 创建三个相机的关键点检测生成器,并使用 sync_batch_gen 函数将它们同步为时间对齐的批次。
FPS = 24 # 帧率24帧/秒
# 创建三个相机的检测生成器假设port=5600,5601,5602对应三个不同相机
image_gen_5600 = preprocess_keypoint_dataset(KEYPOINT_DATASET[5600], from_camera_params(AK_CAMERA_DATASET[AK_CAMERA_DATASET["port"] == 5600][0]), FPS, datetime(2024, 4, 2, 12, 0, 0)) # type: ignore
image_gen_5601 = preprocess_keypoint_dataset(KEYPOINT_DATASET[5601], from_camera_params(AK_CAMERA_DATASET[AK_CAMERA_DATASET["port"] == 5601][0]), FPS, datetime(2024, 4, 2, 12, 0, 0)) # type: ignore
image_gen_5602 = preprocess_keypoint_dataset(KEYPOINT_DATASET[5602], from_camera_params(AK_CAMERA_DATASET[AK_CAMERA_DATASET["port"] == 5602][0]), FPS, datetime(2024, 4, 2, 12, 0, 0)) # type: ignore
display(1 / FPS)
display(1 / FPS) # 每帧时间间隔约0.0417秒
# 同步三个生成器时间窗口为1/FPS秒即同一批次内的检测时间差不超过一帧
sync_gen = sync_batch_gen(
[image_gen_5600, image_gen_5601, image_gen_5602], timedelta(seconds=1 / FPS)
)
# %%
# %% 基于 对极约束 计算不同相机检测结果之间的关联度矩阵,并返回排序后的检测结果和关联度矩阵
#输入 # next(sync_gen):从同步生成器获取的一批检测结果(包含多个相机在相近时间点的检测)
# alpha_2d=2000控制 2D 距离权重的参数,用于平衡对极约束和其他特征(如外观、运动)的影响
#输出 #sorted_detections排序后的检测结果列表
#affinity_matrix关联度矩阵matrix[i][j] 表示第 i 个检测与第 j 个检测的关联程度(值越大表示越可能是同一目标)
sorted_detections, affinity_matrix = calculate_affinity_matrix_by_epipolar_constraint(
next(sync_gen), alpha_2d=2000
)
display(sorted_detections)
# %%
display(
# %% # 可视化多相机目标跟踪中的关键数据:检测时间戳和关联度矩阵
display( #将排序后的检测结果转换为包含时间戳和相机 ID 的字典列表,并在 Jupyter 中显示
list(
map(
lambda x: {"timestamp": str(x.timestamp), "camera": x.camera.id},
@ -357,12 +364,12 @@ display(
)
)
)
with jnp.printoptions(precision=3, suppress=True):
with jnp.printoptions(precision=3, suppress=True): #以高精度格式显示关联度矩阵,控制浮点数精度为 3 位,并禁用科学计数法
display(affinity_matrix)
# %%
def clusters_to_detections(
# %% #实现了一个基于关联度矩阵的聚类算法,将可能属于同一目标的检测结果分组
def clusters_to_detections( # 聚类函数
clusters: Sequence[Sequence[int]], sorted_detections: Sequence[Detection]
) -> list[list[Detection]]:
"""
@ -379,17 +386,17 @@ def clusters_to_detections(
return [[sorted_detections[i] for i in cluster] for cluster in clusters]
solver = GLPKSolver()
aff_np = np.asarray(affinity_matrix).astype(np.float64)
clusters, sol_matrix = solver.solve(aff_np)
solver = GLPKSolver() # 初始化GLPK线性规划求解器
aff_np = np.asarray(affinity_matrix).astype(np.float64) # 转换关联度矩阵为NumPy数组
clusters, sol_matrix = solver.solve(aff_np) # 求解聚类问题
display(clusters)
display(sol_matrix)
# %%
# %% #两个函数用于处理嵌套数据结构
T = TypeVar("T")
def flatten_values(
def flatten_values( # 将 字典 中所有序列值展开成一个 一维 列表
d: Mapping[Any, Sequence[T]],
) -> list[T]:
"""
@ -398,7 +405,7 @@ def flatten_values(
return [v for vs in d.values() for v in vs]
def flatten_values_len(
def flatten_values_len( #计算字典中所有序列值的元素总数
d: Mapping[Any, Sequence[T]],
) -> int:
"""
@ -408,19 +415,22 @@ def flatten_values_len(
return val
# %%
# %% #将同一目标在不同相机中的关键点投影到同一图像上,直观验证多相机跟踪的准确性
WIDTH = 2560
HEIGHT = 1440
# 将聚类结果转换为Detection对象列表
clusters_detections = clusters_to_detections(clusters, sorted_detections)
# 创建空白图像(黑色背景)
im = np.zeros((HEIGHT, WIDTH, 3), dtype=np.uint8)
# 可视化第一个聚类中的所有检测(同一目标在不同相机中的关键点)
for el in clusters_detections[0]:
im = visualize_whole_body(np.asarray(el.keypoints), im)
# 显示结果图像
p = plt.imshow(im)
display(p)
# %%
# %% #根据上部分顺延,可视化第二个聚类,通常指检测中的第二个个体
im_prime = np.zeros((HEIGHT, WIDTH, 3), dtype=np.uint8)
for el in clusters_detections[1]:
im_prime = visualize_whole_body(np.asarray(el.keypoints), im_prime)
@ -429,9 +439,9 @@ p_prime = plt.imshow(im_prime)
display(p_prime)
# %%
# %% #从多视角图像点进行三维点三角测量的算法
@jaxtyped(typechecker=beartype)
def triangulate_one_point_from_multiple_views_linear(
def triangulate_one_point_from_multiple_views_linear( # 单一点的三角测量
proj_matrices: Float[Array, "N 3 4"],
points: Num[Array, "N 2"],
confidences: Optional[Float[Array, "N"]] = None,
@ -479,7 +489,7 @@ def triangulate_one_point_from_multiple_views_linear(
@jaxtyped(typechecker=beartype)
def triangulate_points_from_multiple_views_linear(
def triangulate_points_from_multiple_views_linear( # 批量三角测量
proj_matrices: Float[Array, "N 3 4"],
points: Num[Array, "N P 2"],
confidences: Optional[Float[Array, "N P"]] = None,
@ -511,9 +521,9 @@ def triangulate_points_from_multiple_views_linear(
return vmap_triangulate(proj_matrices, points, conf)
# %%
# %% #两个函数实现了带时间权重的多视角三维点三角测量算法
@jaxtyped(typechecker=beartype)
def triangulate_one_point_from_multiple_views_linear_time_weighted(
def triangulate_one_point_from_multiple_views_linear_time_weighted( #单一点三角测量函数
proj_matrices: Float[Array, "N 3 4"],
points: Num[Array, "N 2"],
delta_t: Num[Array, "N"],
@ -589,7 +599,7 @@ def triangulate_one_point_from_multiple_views_linear_time_weighted(
@jaxtyped(typechecker=beartype)
def triangulate_points_from_multiple_views_linear_time_weighted(
def triangulate_points_from_multiple_views_linear_time_weighted( #批量三角测量函数
proj_matrices: Float[Array, "N 3 4"],
points: Num[Array, "N P 2"],
delta_t: Num[Array, "N"],
@ -647,9 +657,7 @@ def triangulate_points_from_multiple_views_linear_time_weighted(
)
# %%
# %% #从一个聚类的检测结果中通过三角测量计算三维点坐标,并返回该聚类的最新时间戳
@jaxtyped(typechecker=beartype)
def triangle_from_cluster(
cluster: Sequence[Detection],
@ -666,8 +674,8 @@ def triangle_from_cluster(
)
# %%
def group_by_cluster_by_camera(
# %% #多目标跟踪系统的核心逻辑,用于从聚类的检测结果中创建和管理全局跟踪状态
def group_by_cluster_by_camera( #按相机分组函数
cluster: Sequence[Detection],
) -> PMap[CameraID, Detection]:
"""
@ -682,7 +690,7 @@ def group_by_cluster_by_camera(
return pmap(r)
class GlobalTrackingState:
class GlobalTrackingState: #全局跟踪状态类
_last_id: int
_trackings: dict[int, Tracking]
@ -699,7 +707,7 @@ class GlobalTrackingState:
def trackings(self) -> dict[int, Tracking]:
return shallow_copy(self._trackings)
def add_tracking(self, cluster: Sequence[Detection]) -> Tracking:
def add_tracking(self, cluster: Sequence[Detection]) -> Tracking: #为一个聚类创建新的跟踪记录
if len(cluster) < 2:
raise ValueError(
"cluster must contain at least 2 detections to form a tracking"
@ -726,14 +734,14 @@ for cluster in clusters_detections:
global_tracking_state.add_tracking(cluster)
display(global_tracking_state)
# %%
next_group = next(sync_gen)
display(next_group)
# %% #从同步生成器 sync_gen 中获取下一批时间对齐的检测结果,并通过 display() 函数进行可视化
next_group = next(sync_gen) # 从同步生成器获取下一批检测结果
display(next_group) # 在Jupyter环境中显示该批次数据
# %%
# %% #多相机跟踪系统中 关联亲和度 计算的核心算法
@jaxtyped(typechecker=beartype)
def calculate_distance_2d(
def calculate_distance_2d( #归一化 2D 距离
left: Num[Array, "J 2"],
right: Num[Array, "J 2"],
image_size: tuple[int, int] = (1, 1),
@ -762,7 +770,7 @@ def calculate_distance_2d(
@jaxtyped(typechecker=beartype)
def calculate_affinity_2d(
def calculate_affinity_2d( #2D 亲和度分数
distance_2d: Float[Array, "J"],
delta_t: timedelta,
w_2d: float,
@ -795,7 +803,7 @@ def calculate_affinity_2d(
@jaxtyped(typechecker=beartype)
def perpendicular_distance_point_to_line_two_points(
def perpendicular_distance_point_to_line_two_points( #点到射线的垂直距离
point: Num[Array, "3"], line: tuple[Num[Array, "3"], Num[Array, "3"]]
) -> Float[Array, ""]:
"""
@ -818,6 +826,7 @@ def perpendicular_distance_point_to_line_two_points(
@jaxtyped(typechecker=beartype)
#多相机三维重建中的射线距离计算,是评估 2D 检测点与 3D 跟踪点匹配程度的核心算法
def perpendicular_distance_camera_2d_points_to_tracking_raycasting(
detection: Detection,
tracking: Tracking,
@ -861,7 +870,7 @@ def perpendicular_distance_camera_2d_points_to_tracking_raycasting(
@jaxtyped(typechecker=beartype)
def calculate_affinity_3d(
def calculate_affinity_3d( #3D 亲和度分数
distances: Float[Array, "J"],
delta_t: timedelta,
w_3d: float,
@ -892,7 +901,7 @@ def calculate_affinity_3d(
@beartype
def calculate_tracking_detection_affinity(
def calculate_tracking_detection_affinity( #综合亲和度计算流程
tracking: Tracking,
detection: Detection,
w_2d: float,
@ -954,9 +963,9 @@ def calculate_tracking_detection_affinity(
return jnp.sum(total_affinity).item()
# %%
# %% #实现了多相机跟踪系统中亲和度矩阵的高效计算是连接跟踪轨迹Tracking与新检测结果Detection的核心算法
@beartype
def calculate_camera_affinity_matrix_jax(
def calculate_camera_affinity_matrix_jax( #相机亲和度矩阵计算
trackings: Sequence[Tracking],
camera_detections: Sequence[Detection],
w_2d: float,
@ -1118,7 +1127,7 @@ def calculate_camera_affinity_matrix_jax(
@beartype
def calculate_affinity_matrix(
def calculate_affinity_matrix( #多相机亲和度矩阵计算
trackings: Sequence[Tracking],
detections: Sequence[Detection] | Mapping[CameraID, list[Detection]],
w_2d: float,
@ -1170,7 +1179,7 @@ def calculate_affinity_matrix(
return res
# %%
# %% #实现了跨视角关联cross-view association 流程
# let's do cross-view association
W_2D = 1.0
ALPHA_2D = 1.0
@ -1194,8 +1203,8 @@ affinities = calculate_affinity_matrix(
display(affinities)
# %%
def affinity_result_by_tracking(
# %% #两个函数分别实现了关联结果聚合和轨迹更新的核心逻辑
def affinity_result_by_tracking( #关联结果聚合
results: Iterable[AffinityResult],
min_affinity: float = 0.0,
) -> dict[TrackingID, list[Detection]]:
@ -1217,7 +1226,7 @@ def affinity_result_by_tracking(
return res
def update_tracking(
def update_tracking( #更新流程
tracking: Tracking,
detections: Sequence[Detection],
max_delta_t: timedelta = timedelta(milliseconds=100),
@ -1268,9 +1277,9 @@ def update_tracking(
tracking.state = new_state
# %%
affinity_results_by_tracking = affinity_result_by_tracking(affinities.values())
for tracking_id, detections in affinity_results_by_tracking.items():
# %% #多目标跟踪系统中轨迹更新的核心流程
affinity_results_by_tracking = affinity_result_by_tracking(affinities.values()) # 1. 按轨迹ID聚合所有相机的匹配检测结果
for tracking_id, detections in affinity_results_by_tracking.items(): # 2. 遍历每个轨迹ID用匹配的检测结果更新轨迹
update_tracking(global_tracking_state.trackings[tracking_id], detections)
# %%