1
0
forked from HQU-gxy/CVTH3PE

Single-person detection and DLT, but there is no complete tracking

This commit is contained in:
2025-07-08 14:06:10 +08:00
parent 492b4fba04
commit 835367cd6d
9 changed files with 1738 additions and 169 deletions

View File

@ -6,6 +6,7 @@ import awkward as ak
from typing import (
Any,
Generator,
Iterable,
Optional,
Sequence,
TypeAlias,
@ -121,7 +122,7 @@ def get_camera_detect(
for element_port in ak.to_numpy(camera_dataset["port"]):
if element_port in camera_port:
keypoint_data[int(element_port)] = ak.from_parquet(
detect_path / f"{element_port}.parquet"
detect_path / f"{element_port}_detected.parquet"
)
return keypoint_data
@ -258,7 +259,13 @@ def sync_batch_gen(
for i, gen in enumerate(gens):
try:
if finished[i] or paused[i]:
continue
if all(finished):
if len(current_batch) > 0:
# All generators exhausted, flush remaining batch and exit
yield current_batch
return
else:
continue
val = next(gen)
if last_batch_timestamp is None:
last_batch_timestamp = val.timestamp
@ -280,13 +287,7 @@ def sync_batch_gen(
else:
current_batch.append(val)
except StopIteration:
finished[i] = True
paused[i] = True
if all(finished):
if len(current_batch) > 0:
# All generators exhausted, flush remaining batch and exit
yield current_batch
break
return
def get_batch_detect(
@ -475,38 +476,36 @@ def triangulate_one_point_from_multiple_views_linear(
proj_matrices: Float[Array, "N 3 4"],
points: Num[Array, "N 2"],
confidences: Optional[Float[Array, "N"]] = None,
conf_threshold: float = 0.2,
) -> Float[Array, "3"]:
"""
Args:
proj_matrices: 形状为(N, 3, 4)的投影矩阵序列
points: 形状为(N, 2)的点坐标序列
confidences: 形状为(N,)的置信度序列,范围[0.0, 1.0]
conf_threshold: 置信度阈值低于该值的观测不参与DLT
Returns:
point_3d: 形状为(3,)的三角测量得到的3D点
"""
assert len(proj_matrices) == len(points)
N = len(proj_matrices)
confi: Float[Array, "N"]
if confidences is None:
confi = jnp.ones(N, dtype=np.float32)
weights = jnp.ones(N, dtype=jnp.float32)
else:
# Use square root of confidences for weighting - more balanced approach
confi = jnp.sqrt(jnp.clip(confidences, 0, 1))
# 置信度低于阈值的点权重为0其余为sqrt(conf)
valid_mask = confidences >= conf_threshold
weights = jnp.where(valid_mask, jnp.sqrt(jnp.clip(confidences, 0, 1)), 0.0)
# 归一化权重,避免某一帧权重过大
sum_weights = jnp.sum(weights)
weights = jnp.where(sum_weights > 0, weights / sum_weights, weights)
# 将置信度小于0.1点的置信度均设置为0
# valid_mask = confidences >= 0.1
# confi = jnp.sqrt(jnp.clip(confidences * valid_mask, 0.0, 1.0))
A = jnp.zeros((N * 2, 4), dtype=np.float32)
A = jnp.zeros((N * 2, 4), dtype=jnp.float32)
for i in range(N):
x, y = points[i]
A = A.at[2 * i].set(proj_matrices[i, 2] * x - proj_matrices[i, 0])
A = A.at[2 * i + 1].set(proj_matrices[i, 2] * y - proj_matrices[i, 1])
A = A.at[2 * i].mul(confi[i])
A = A.at[2 * i + 1].mul(confi[i])
A = A.at[2 * i].mul(weights[i])
A = A.at[2 * i + 1].mul(weights[i])
# https://docs.jax.dev/en/latest/_autosummary/jax.numpy.linalg.svd.html
_, _, vh = jnp.linalg.svd(A, full_matrices=False)
@ -896,23 +895,68 @@ def update_tracking(
tracking.state = new_state
# 对每一个3d目标进行滑动窗口平滑处理
def smooth_3d_keypoints(
all_3d_kps: dict[str, list], window_size: int = 5
) -> dict[str, list]:
# window_size = 5
kernel = np.ones(window_size) / window_size
smoothed_points = dict()
for item_object_index in all_3d_kps.keys():
item_object = np.array(all_3d_kps[item_object_index])
if item_object.shape[0] < window_size:
# 如果数据点少于窗口大小,则直接返回原始数据
smoothed_points[item_object_index] = item_object.tolist()
continue
# 对每个关键点的每个坐标轴分别做滑动平均
item_smoothed = np.zeros_like(item_object)
# 遍历133个关节
for kp_idx in range(item_object.shape[1]):
# 遍历每个关节的空间三维坐标点
for axis in range(3):
# 对第i帧的滑动平滑方式 smoothed[i] = (point[i-2] + point[i-1] + point[i] + point[i+1] + point[i+2]) / 5
item_smoothed[:, kp_idx, axis] = np.convolve(
item_object[:, kp_idx, axis], kernel, mode="same"
)
smoothed_points[item_object_index] = item_smoothed.tolist()
return smoothed_points
# 通过平均置信度筛选2d检测数据
def filter_keypoints_by_scores(detections: Iterable[Detection], threshold: float = 0.5):
"""
Filter detections based on the average confidence score of their keypoints.
Only keep detections with an average score above the threshold.
"""
def filter_detection(detection: Detection) -> bool:
avg_score = np.mean(detection.confidences)
return float(avg_score) >= threshold
return filter(filter_detection, detections)
def filter_camera_port(detections: list[Detection]):
camera_port = set()
for detection in detections:
camera_port.add(detection.camera.id)
return list(camera_port)
# 相机内外参路径
CAMERA_PATH = Path(
"/home/admin/Documents/ActualTest_QuanCheng/camera_ex_params_1_2025_4_20/camera_params"
)
CAMERA_PATH = Path("/home/admin/Documents/ActualTest_WeiHua/camera_params")
# 所有机位的相机内外参
AK_CAMERA_DATASET: ak.Array = get_camera_params(CAMERA_PATH)
# 2d检测数据路径
DATASET_PATH = Path(
"/home/admin/Documents/ActualTest_QuanCheng/camera_ex_params_1_2025_4_20/detect_result/segement_1"
)
DATASET_PATH = Path("/home/admin/Documents/ActualTest_WeiHua/Test_Video")
# 指定机位的2d检测数据
camera_port = [5603, 5605, 5608, 5609]
camera_port = [5602, 5603, 5604, 5605]
KEYPOINT_DATASET = get_camera_detect(DATASET_PATH, camera_port, AK_CAMERA_DATASET)
# 获取一段完整的跳跃片段
FRAME_INDEX = [i for i in range(0, 600)]
FRAME_INDEX = [i for i in range(552, 1488)] # 552 1488
KEYPOINT_DATASET = get_segment(camera_port, FRAME_INDEX, KEYPOINT_DATASET)
@ -935,15 +979,14 @@ ALPHA_3D = 0.15
# 帧数计数器
count = 0
# 追踪相似度矩阵匹配阈值
affinities_threshold = 70
affinities_threshold = -20
# 跟踪目标集合
trackings: list[Tracking] = []
# 3d数据键为追踪目标id值为该目标的所有3d数据
all_3d_kps: dict[str, list] = {}
# 遍历2d数据测试追踪状态
while count < (max(FRAME_INDEX) - min(FRAME_INDEX)):
count += 1
while True:
# 获得当前追踪目标
trackings: list[Tracking] = sorted(
global_tracking_state.trackings.values(), key=lambda x: x.id
@ -951,14 +994,21 @@ while count < (max(FRAME_INDEX) - min(FRAME_INDEX)):
try:
detections = next(sync_gen)
# 通过平均置信度筛选2d检测数据
# detections = list(filter_keypoints_by_scores(detections, threshold=0.5))
except StopIteration:
break
if len(detections) == 0:
print("no detections in this frame, continue")
continue
# print("Detection len:", len(detections), "count:", count)
# 获得最新一帧的数据2d数据
# 判断追踪状态是否建立成功,若不成功则跳过这一帧数据,直到追踪建立
if not trackings:
"""离机时使用,用于初始化第一帧"""
"""
# 使用盒子筛选后的2d检测数据
filter_detections = get_filter_detections(detections)
# 当3个机位均有目标时才建立追踪状态
@ -966,25 +1016,35 @@ while count < (max(FRAME_INDEX) - min(FRAME_INDEX)):
# continue
if len(filter_detections) < len(camera_port):
print(
"init traincking error, filter detections len:",
"init traincking error, filter filter_detections len:",
len(filter_detections),
"time:",
detections[0].timestamp,
)
continue
# 添加第一帧数据构建追踪目标
global_tracking_state.add_tracking(filter_detections)
# 获得当前追踪目标
trackings: list[Tracking] = sorted(
global_tracking_state.trackings.values(), key=lambda x: x.id
)
# 保留第一帧的3d姿态数据
for element_tracking in trackings:
if str(element_tracking.id) not in all_3d_kps.keys():
all_3d_kps[str(element_tracking.id)] = [
element_tracking.state.keypoints.tolist()
]
print("initer tracking:", trackings)
"""
# 通过平均置信度筛选2d检测数据
# detections = list(filter_keypoints_by_scores(detections, threshold=0.7))
# 当4个机位都识别到目标时才建立追踪状态
camera_port = filter_camera_port(detections)
if len(detections) < len(camera_port):
print(
"init traincking error, filter_detections len:",
len(detections),
)
else:
# 添加第一帧数据构建追踪目标
global_tracking_state.add_tracking(detections) # 离机时filter_detections
# 获得当前追踪目标
trackings: list[Tracking] = sorted(
global_tracking_state.trackings.values(), key=lambda x: x.id
)
# 保留第一帧的3d姿态数据
for element_tracking in trackings:
if str(element_tracking.id) not in all_3d_kps.keys():
all_3d_kps[str(element_tracking.id)] = [
element_tracking.state.keypoints.tolist()
]
print("initer tracking:", trackings)
else:
# 计算相似度矩阵匹配结果
affinities: dict[str, AffinityResult] = calculate_affinity_matrix(
@ -1045,8 +1105,7 @@ while count < (max(FRAME_INDEX) - min(FRAME_INDEX)):
- element_tracking.state.last_active_timestamp
)
# 当时间间隔超过1s删除保留的追踪状态
if time_gap.seconds > 3:
# trackings.remove(element_tracking)
if time_gap.seconds > 0.5:
global_tracking_state._trackings.pop(element_tracking.id)
print(
"remove trackings:",
@ -1055,8 +1114,12 @@ while count < (max(FRAME_INDEX) - min(FRAME_INDEX)):
detections[0].timestamp,
)
# 对每一个3d目标进行滑动窗口平滑处理
smoothed_points = smooth_3d_keypoints(all_3d_kps, window_size=5)
with open("samples/QuanCheng_res.json", "wb") as f:
f.write(orjson.dumps(all_3d_kps))
for element_3d_kps_id in all_3d_kps.keys():
# 将结果保存到json文件中
with open("samples/Test_WeiHua.json", "wb") as f:
f.write(orjson.dumps(smoothed_points))
# 输出每个3d目标的维度
for element_3d_kps_id in smoothed_points.keys():
print(f"{element_3d_kps_id} : {np.array(all_3d_kps[element_3d_kps_id]).shape}")