feat: initialize offline multiview pose tracking experiment
Set up pose_tracking_exp as a uv-managed Python package for offline multiview body tracking experiments. This initial commit includes: - the typed package scaffold, CLI entrypoints, and repo-local uv configuration - scene and replay loaders for generic JSON replays and ActualTest parquet inputs - ParaJumping payload conversion and RTMPose-to-body20 normalization - a custom articulated tracker with tentative, active, and lost lifecycle handling - RPT-backed proposal generation, camera convention handling, and multiview reprojection updates - regression tests for normalization, camera conventions, ActualTest ingestion, seeding, and tracker smoke flows - project documentation covering extrinsic formats and the ActualTest calibration caveat
This commit is contained in:
@@ -0,0 +1,77 @@
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
|
||||
from pose_tracking_exp.actualtest import load_actualtest_scene, load_actualtest_segment_bundles
|
||||
from pose_tracking_exp.joints import BODY20_INDEX_BY_NAME
|
||||
|
||||
|
||||
def _write_parquet(path: Path, rows: list[dict[str, object]]) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
pq.write_table(pa.Table.from_pylist(rows), path)
|
||||
|
||||
|
||||
def _sample_rtmpose_detection() -> tuple[list[float], list[list[float]], list[float]]:
|
||||
keypoints_xy = np.zeros((133, 2), dtype=np.float64)
|
||||
scores = np.zeros((133,), dtype=np.float64)
|
||||
keypoints_xy[5] = [10.0, 20.0]
|
||||
keypoints_xy[6] = [30.0, 20.0]
|
||||
keypoints_xy[11] = [12.0, 60.0]
|
||||
keypoints_xy[12] = [28.0, 60.0]
|
||||
keypoints_xy[0] = [20.0, 8.0]
|
||||
scores[[0, 5, 6, 11, 12]] = 1.0
|
||||
return [8.0, 4.0, 32.0, 64.0], keypoints_xy.tolist(), scores.tolist()
|
||||
|
||||
|
||||
def test_load_actualtest_parquet_scene_and_segment(tmp_path: Path) -> None:
|
||||
root = tmp_path / "ActualTest_WeiHua"
|
||||
_write_parquet(
|
||||
root / "camera_params" / "camera_params.parquet",
|
||||
[
|
||||
{
|
||||
"name": "AF_02",
|
||||
"port": 5602,
|
||||
"intrinsic": {
|
||||
"camera_matrix": [[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]],
|
||||
"distortion_coefficients": [0.0, 0.0, 0.0, 0.0, 0.0],
|
||||
},
|
||||
"extrinsic": {"rvec": [0.0, 0.0, 0.0], "tvec": [0.0, 0.0, 0.0]},
|
||||
"resolution": {"width": 640, "height": 480},
|
||||
},
|
||||
{
|
||||
"name": "AF_03",
|
||||
"port": 5603,
|
||||
"intrinsic": {
|
||||
"camera_matrix": [[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]],
|
||||
"distortion_coefficients": [0.0, 0.0, 0.0, 0.0, 0.0],
|
||||
},
|
||||
"extrinsic": {"rvec": [0.0, 0.0, 0.0], "tvec": [1.0, 0.0, 0.0]},
|
||||
"resolution": {"width": 640, "height": 480},
|
||||
},
|
||||
],
|
||||
)
|
||||
box, keypoints_xy, scores = _sample_rtmpose_detection()
|
||||
for camera_name in ("5602", "5603"):
|
||||
_write_parquet(
|
||||
root / "Segment_1" / f"{camera_name}_detected.parquet",
|
||||
[
|
||||
{"frame_index": 689, "boxes": [], "kps": [], "kps_scores": []},
|
||||
{"frame_index": 690, "boxes": [box], "kps": [keypoints_xy], "kps_scores": [scores]},
|
||||
],
|
||||
)
|
||||
|
||||
scene = load_actualtest_scene(root)
|
||||
bundles = load_actualtest_segment_bundles(root, "Segment_1", frame_start=690, max_frames=1)
|
||||
|
||||
assert [camera.name for camera in scene.cameras] == ["5602", "5603"]
|
||||
np.testing.assert_allclose(scene.cameras[0].pose_T, [0.0, 0.0, 0.0])
|
||||
np.testing.assert_allclose(scene.cameras[1].pose_T, [-1.0, 0.0, 0.0])
|
||||
assert len(bundles) == 1
|
||||
assert [view.camera_name for view in bundles[0].views] == ["5602", "5603"]
|
||||
assert bundles[0].views[0].frame_index == 690
|
||||
np.testing.assert_allclose(
|
||||
bundles[0].views[0].detections[0].keypoints[BODY20_INDEX_BY_NAME["hip_middle"], :2],
|
||||
[20.0, 60.0],
|
||||
)
|
||||
@@ -0,0 +1,148 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple, cast
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
pytest.importorskip("rpt")
|
||||
|
||||
from pose_tracking_exp.models import CameraCalibration, SceneConfig
|
||||
from pose_tracking_exp.replay import load_scene_file
|
||||
from pose_tracking_exp.rpt_adapter import build_rpt_config
|
||||
|
||||
|
||||
class _CameraArgs(NamedTuple):
|
||||
name: str
|
||||
width: int
|
||||
height: int
|
||||
K: np.ndarray
|
||||
DC: np.ndarray
|
||||
model: str
|
||||
|
||||
|
||||
def _camera_args() -> _CameraArgs:
|
||||
return _CameraArgs(
|
||||
name="cam0",
|
||||
width=640,
|
||||
height=480,
|
||||
K=np.asarray([[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]], dtype=np.float64),
|
||||
DC=np.zeros(5, dtype=np.float64),
|
||||
model="pinhole",
|
||||
)
|
||||
|
||||
|
||||
def test_from_opencv_extrinsics_derives_rpt_pose() -> None:
|
||||
args = _camera_args()
|
||||
rotation_vec = np.asarray([0.0, 0.2, 0.0], dtype=np.float64).reshape(3, 1)
|
||||
rotation, _ = cv2.Rodrigues(rotation_vec)
|
||||
translation = np.asarray([0.5, -0.1, 2.0], dtype=np.float64)
|
||||
|
||||
camera = CameraCalibration.from_opencv_extrinsics(
|
||||
name=args.name,
|
||||
width=args.width,
|
||||
height=args.height,
|
||||
K=args.K,
|
||||
DC=args.DC,
|
||||
model=args.model,
|
||||
R=rotation,
|
||||
T=translation,
|
||||
rvec=rotation_vec.reshape(3),
|
||||
)
|
||||
|
||||
np.testing.assert_allclose(camera.pose_R, rotation.T)
|
||||
np.testing.assert_allclose(camera.pose_T, -(rotation.T @ translation))
|
||||
|
||||
|
||||
def test_from_rpt_pose_derives_opencv_extrinsics() -> None:
|
||||
args = _camera_args()
|
||||
pose_rotation_vec = np.asarray([0.0, -0.3, 0.0], dtype=np.float64).reshape(3, 1)
|
||||
pose_rotation, _ = cv2.Rodrigues(pose_rotation_vec)
|
||||
pose_translation = np.asarray([1.5, 0.2, -0.4], dtype=np.float64)
|
||||
|
||||
camera = CameraCalibration.from_rpt_pose(
|
||||
name=args.name,
|
||||
width=args.width,
|
||||
height=args.height,
|
||||
K=args.K,
|
||||
DC=args.DC,
|
||||
model=args.model,
|
||||
R=pose_rotation,
|
||||
T=pose_translation,
|
||||
)
|
||||
|
||||
np.testing.assert_allclose(camera.pose_R, pose_rotation)
|
||||
np.testing.assert_allclose(camera.pose_T, pose_translation)
|
||||
np.testing.assert_allclose(camera.R, pose_rotation.T)
|
||||
np.testing.assert_allclose(camera.T, -(pose_rotation.T @ pose_translation))
|
||||
|
||||
|
||||
def test_load_scene_file_supports_explicit_rpt_pose(tmp_path: Path) -> None:
|
||||
scene_path = tmp_path / "scene.json"
|
||||
payload = {
|
||||
"extrinsic_format": "rpt_camera_pose",
|
||||
"room_size": [6.0, 4.0, 3.0],
|
||||
"room_center": [0.0, 0.0, 1.0],
|
||||
"cameras": [
|
||||
{
|
||||
"name": "cam0",
|
||||
"width": 640,
|
||||
"height": 480,
|
||||
"K": [[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]],
|
||||
"DC": [0.0, 0.0, 0.0, 0.0, 0.0],
|
||||
"R": [[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]],
|
||||
"T": [[1.0], [2.0], [3.0]],
|
||||
}
|
||||
],
|
||||
}
|
||||
scene_path.write_text(json.dumps(payload), encoding="utf-8")
|
||||
|
||||
scene = load_scene_file(scene_path)
|
||||
|
||||
np.testing.assert_allclose(scene.cameras[0].pose_T, [1.0, 2.0, 3.0])
|
||||
np.testing.assert_allclose(scene.cameras[0].T, [-1.0, -2.0, -3.0])
|
||||
|
||||
|
||||
def test_build_rpt_config_uses_pose_convention(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
args = _camera_args()
|
||||
camera = CameraCalibration.from_opencv_extrinsics(
|
||||
name=args.name,
|
||||
width=args.width,
|
||||
height=args.height,
|
||||
K=args.K,
|
||||
DC=args.DC,
|
||||
model=args.model,
|
||||
R=np.eye(3, dtype=np.float64),
|
||||
T=np.asarray([1.0, 2.0, 3.0], dtype=np.float64),
|
||||
rvec=np.zeros(3, dtype=np.float64),
|
||||
)
|
||||
scene = SceneConfig(
|
||||
room_size=np.asarray([6.0, 4.0, 3.0], dtype=np.float64),
|
||||
room_center=np.asarray([0.0, 0.0, 1.0], dtype=np.float64),
|
||||
cameras=(camera,),
|
||||
)
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
def fake_make_triangulation_config(
|
||||
cameras: list[dict[str, object]],
|
||||
roomparams: np.ndarray,
|
||||
joint_names: list[str],
|
||||
*,
|
||||
min_match_score: float,
|
||||
min_group_size: int,
|
||||
) -> dict[str, object]:
|
||||
captured["cameras"] = cameras
|
||||
captured["roomparams"] = roomparams
|
||||
captured["joint_names"] = joint_names
|
||||
captured["min_match_score"] = min_match_score
|
||||
captured["min_group_size"] = min_group_size
|
||||
return captured
|
||||
|
||||
monkeypatch.setattr("pose_tracking_exp.rpt_adapter.rpt.make_triangulation_config", fake_make_triangulation_config)
|
||||
|
||||
build_rpt_config(scene, min_match_score=0.5, min_group_size=2)
|
||||
|
||||
camera_payload = cast(list[dict[str, object]], captured["cameras"])[0]
|
||||
assert camera_payload["R"] == camera.pose_R.tolist()
|
||||
assert camera_payload["T"] == camera.pose_T.reshape(3, 1).tolist()
|
||||
@@ -0,0 +1,49 @@
|
||||
import numpy as np
|
||||
|
||||
from pose_tracking_exp.joints import BODY20_INDEX_BY_NAME
|
||||
from pose_tracking_exp.kinematics import seed_state_from_pose3d
|
||||
|
||||
|
||||
def _sample_pose3d() -> np.ndarray:
|
||||
pose = np.zeros((20, 4), dtype=np.float64)
|
||||
joint_positions = {
|
||||
"hip_middle": [0.0, 1.0, 3.0],
|
||||
"hip_left": [0.12, 1.0, 3.0],
|
||||
"hip_right": [-0.12, 1.0, 3.0],
|
||||
"shoulder_middle": [0.0, 1.52, 3.0],
|
||||
"shoulder_left": [0.18, 1.52, 3.0],
|
||||
"shoulder_right": [-0.18, 1.52, 3.0],
|
||||
"elbow_left": [0.42, 1.48, 3.02],
|
||||
"elbow_right": [-0.42, 1.48, 3.02],
|
||||
"wrist_left": [0.64, 1.45, 3.04],
|
||||
"wrist_right": [-0.64, 1.45, 3.04],
|
||||
"knee_left": [0.1, 0.58, 3.0],
|
||||
"knee_right": [-0.1, 0.58, 3.0],
|
||||
"ankle_left": [0.1, 0.15, 3.02],
|
||||
"ankle_right": [-0.1, 0.15, 3.02],
|
||||
"head": [0.0, 1.82, 3.02],
|
||||
"nose": [0.0, 1.8, 3.06],
|
||||
"eye_left": [0.03, 1.81, 3.05],
|
||||
"eye_right": [-0.03, 1.81, 3.05],
|
||||
"ear_left": [0.06, 1.81, 3.02],
|
||||
"ear_right": [-0.06, 1.81, 3.02],
|
||||
}
|
||||
for name, position in joint_positions.items():
|
||||
pose[BODY20_INDEX_BY_NAME[name], :3] = position
|
||||
pose[BODY20_INDEX_BY_NAME[name], 3] = 1.0
|
||||
return pose
|
||||
|
||||
|
||||
def test_seed_state_from_pose3d_does_not_call_least_squares(monkeypatch) -> None:
|
||||
def fail_least_squares(*args: object, **kwargs: object) -> object:
|
||||
raise AssertionError("seed_state_from_pose3d should not call scipy.optimize.least_squares")
|
||||
|
||||
monkeypatch.setattr("pose_tracking_exp.kinematics.least_squares", fail_least_squares)
|
||||
state = seed_state_from_pose3d(_sample_pose3d())
|
||||
|
||||
assert state.parameters.shape == (31,)
|
||||
assert state.beta.shape == (8,)
|
||||
np.testing.assert_allclose(
|
||||
state.parameters[:3],
|
||||
_sample_pose3d()[BODY20_INDEX_BY_NAME["hip_middle"], :3],
|
||||
)
|
||||
@@ -0,0 +1,156 @@
|
||||
import base64
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
from pose_tracking_exp.joints import BODY20_INDEX_BY_NAME
|
||||
from pose_tracking_exp.normalization import normalize_rtmpose_body20
|
||||
from pose_tracking_exp.parajumping import PROTOCOL_HEADER, convert_payload_record, decode_pose_payload
|
||||
from pose_tracking_exp.replay import load_replay_file, load_scene_file
|
||||
from pose_tracking_exp.sync import synchronize_frames
|
||||
|
||||
|
||||
def _encode_payload(
|
||||
*,
|
||||
frame_index: int,
|
||||
reference_size: tuple[int, int],
|
||||
boxes: np.ndarray,
|
||||
box_scores: np.ndarray,
|
||||
keypoints_xy: np.ndarray,
|
||||
keypoint_scores: np.ndarray,
|
||||
timestamp_unix_ns: int,
|
||||
) -> bytes:
|
||||
return (
|
||||
PROTOCOL_HEADER
|
||||
+ int(frame_index).to_bytes(4, "little")
|
||||
+ np.asarray(reference_size, dtype="<u2").tobytes()
|
||||
+ int(boxes.shape[0]).to_bytes(1, "little")
|
||||
+ np.asarray(boxes, dtype="<u2").tobytes()
|
||||
+ int(box_scores.shape[0]).to_bytes(1, "little")
|
||||
+ np.asarray(box_scores, dtype=np.uint8).tobytes()
|
||||
+ int(keypoints_xy.shape[0]).to_bytes(1, "little")
|
||||
+ np.asarray(keypoints_xy, dtype="<u2").tobytes()
|
||||
+ int(keypoint_scores.size).to_bytes(1, "little")
|
||||
+ np.asarray(keypoint_scores, dtype=np.uint8).reshape(-1).tobytes()
|
||||
+ int(timestamp_unix_ns).to_bytes(8, "little")
|
||||
)
|
||||
|
||||
|
||||
def test_normalize_rtmpose_body20_derives_midpoints_and_head():
|
||||
keypoints = np.zeros((133, 2), dtype=np.float64)
|
||||
scores = np.zeros((133,), dtype=np.float64)
|
||||
keypoints[5] = [10.0, 20.0]
|
||||
keypoints[6] = [30.0, 20.0]
|
||||
keypoints[11] = [12.0, 60.0]
|
||||
keypoints[12] = [28.0, 60.0]
|
||||
keypoints[0] = [20.0, 8.0]
|
||||
scores[[0, 5, 6, 11, 12]] = 1.0
|
||||
|
||||
normalized = normalize_rtmpose_body20(keypoints, scores)
|
||||
|
||||
np.testing.assert_allclose(normalized[BODY20_INDEX_BY_NAME["hip_middle"], :2], [20.0, 60.0])
|
||||
np.testing.assert_allclose(normalized[BODY20_INDEX_BY_NAME["shoulder_middle"], :2], [20.0, 20.0])
|
||||
np.testing.assert_allclose(normalized[BODY20_INDEX_BY_NAME["head"], :2], [20.0, 8.0])
|
||||
|
||||
|
||||
def test_decode_payload_and_convert_record():
|
||||
keypoints_xy = np.zeros((1, 133, 2), dtype=np.uint16)
|
||||
keypoint_scores = np.zeros((1, 133), dtype=np.uint8)
|
||||
keypoints_xy[0, 5] = [100, 200]
|
||||
keypoints_xy[0, 6] = [200, 200]
|
||||
keypoints_xy[0, 11] = [110, 400]
|
||||
keypoints_xy[0, 12] = [190, 400]
|
||||
keypoints_xy[0, 0] = [150, 120]
|
||||
keypoint_scores[0, [0, 5, 6, 11, 12]] = 255
|
||||
|
||||
payload = _encode_payload(
|
||||
frame_index=7,
|
||||
reference_size=(640, 480),
|
||||
boxes=np.asarray([[90, 100, 210, 420]], dtype=np.uint16),
|
||||
box_scores=np.asarray([200], dtype=np.uint8),
|
||||
keypoints_xy=keypoints_xy,
|
||||
keypoint_scores=keypoint_scores,
|
||||
timestamp_unix_ns=1234,
|
||||
)
|
||||
decoded = decode_pose_payload(payload)
|
||||
assert decoded.frame_index == 7
|
||||
assert decoded.reference_size == (640, 480)
|
||||
assert len(decoded.detections) == 1
|
||||
np.testing.assert_allclose(
|
||||
decoded.detections[0].keypoints[BODY20_INDEX_BY_NAME["hip_middle"], :2],
|
||||
[150.0, 400.0],
|
||||
)
|
||||
|
||||
converted = convert_payload_record({"camera": "cam0", "payload_b64": base64.b64encode(payload).decode("ascii")})
|
||||
assert converted["camera"] == "cam0"
|
||||
assert converted["frame_index"] == 7
|
||||
|
||||
|
||||
def test_load_replay_and_synchronize(tmp_path: Path):
|
||||
scene_path = tmp_path / "scene.json"
|
||||
replay_path = tmp_path / "replay.jsonl"
|
||||
scene_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"room_size": [6.0, 4.0, 3.0],
|
||||
"room_center": [0.0, 0.0, 1.0],
|
||||
"cameras": [
|
||||
{
|
||||
"name": "cam0",
|
||||
"width": 640,
|
||||
"height": 480,
|
||||
"K": [[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]],
|
||||
"DC": [0.0, 0.0, 0.0, 0.0, 0.0],
|
||||
"R": [[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]],
|
||||
"T": [[0.0], [0.0], [0.0]],
|
||||
},
|
||||
{
|
||||
"name": "cam1",
|
||||
"width": 640,
|
||||
"height": 480,
|
||||
"K": [[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]],
|
||||
"DC": [0.0, 0.0, 0.0, 0.0, 0.0],
|
||||
"R": [[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]],
|
||||
"T": [[1.0], [0.0], [0.0]],
|
||||
},
|
||||
],
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
pose = np.zeros((20, 3), dtype=np.float64)
|
||||
replay_path.write_text(
|
||||
"\n".join(
|
||||
[
|
||||
json.dumps(
|
||||
{
|
||||
"camera": "cam0",
|
||||
"frame_index": 0,
|
||||
"timestamp_unix_ns": 1000,
|
||||
"source_size": [640, 480],
|
||||
"detections": [{"bbox": [0, 0, 1, 1], "bbox_confidence": 1.0, "keypoints": pose.tolist()}],
|
||||
}
|
||||
),
|
||||
json.dumps(
|
||||
{
|
||||
"camera": "cam1",
|
||||
"frame_index": 0,
|
||||
"timestamp_unix_ns": 1006,
|
||||
"source_size": [640, 480],
|
||||
"detections": [{"bbox": [0, 0, 1, 1], "bbox_confidence": 1.0, "keypoints": pose.tolist()}],
|
||||
}
|
||||
),
|
||||
]
|
||||
)
|
||||
+ "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
scene = load_scene_file(scene_path)
|
||||
assert len(scene.cameras) == 2
|
||||
replay = load_replay_file(scene_path, replay_path)
|
||||
bundles = synchronize_frames(replay, max_skew_ns=20, min_views=2)
|
||||
assert len(bundles) == 1
|
||||
assert {frame.camera_name for frame in bundles[0].views} == {"cam0", "cam1"}
|
||||
|
||||
@@ -0,0 +1,156 @@
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
pytest.importorskip("rpt")
|
||||
|
||||
from pose_tracking_exp.joints import BODY20_INDEX_BY_NAME
|
||||
from pose_tracking_exp.models import CameraCalibration, CameraFrame, FrameBundle, ProposalCluster, SceneConfig, TrackerConfig
|
||||
from pose_tracking_exp.tracker import PoseTracker
|
||||
|
||||
|
||||
def _make_scene() -> SceneConfig:
|
||||
cameras = (
|
||||
CameraCalibration(
|
||||
name="cam0",
|
||||
width=640,
|
||||
height=480,
|
||||
K=np.asarray([[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]], dtype=np.float64),
|
||||
DC=np.zeros(5, dtype=np.float64),
|
||||
R=np.eye(3, dtype=np.float64),
|
||||
T=np.zeros(3, dtype=np.float64),
|
||||
),
|
||||
CameraCalibration(
|
||||
name="cam1",
|
||||
width=640,
|
||||
height=480,
|
||||
K=np.asarray([[500.0, 0.0, 320.0], [0.0, 500.0, 240.0], [0.0, 0.0, 1.0]], dtype=np.float64),
|
||||
DC=np.zeros(5, dtype=np.float64),
|
||||
R=np.eye(3, dtype=np.float64),
|
||||
T=np.asarray([1.0, 0.0, 0.0], dtype=np.float64),
|
||||
),
|
||||
)
|
||||
return SceneConfig(
|
||||
room_size=np.asarray([6.0, 4.0, 3.0], dtype=np.float64),
|
||||
room_center=np.asarray([0.0, 0.0, 1.0], dtype=np.float64),
|
||||
cameras=cameras,
|
||||
)
|
||||
|
||||
|
||||
def _make_bundle(bundle_index: int) -> FrameBundle:
|
||||
views = tuple(
|
||||
CameraFrame(
|
||||
camera_name=camera_name,
|
||||
frame_index=bundle_index,
|
||||
timestamp_unix_ns=1_000_000_000 + bundle_index * 33_000_000,
|
||||
detections=(),
|
||||
source_size=(640, 480),
|
||||
)
|
||||
for camera_name in ("cam0", "cam1")
|
||||
)
|
||||
return FrameBundle(
|
||||
bundle_index=bundle_index,
|
||||
timestamp_unix_ns=views[0].timestamp_unix_ns,
|
||||
views=views,
|
||||
)
|
||||
|
||||
|
||||
def _make_proposal(root_x: float, *, score: float = 1.0) -> ProposalCluster:
|
||||
pose = np.zeros((20, 4), dtype=np.float64)
|
||||
joint_positions = {
|
||||
"hip_middle": [root_x, 1.0, 3.0],
|
||||
"hip_left": [root_x + 0.12, 1.0, 3.0],
|
||||
"hip_right": [root_x - 0.12, 1.0, 3.0],
|
||||
"shoulder_middle": [root_x, 1.52, 3.0],
|
||||
"shoulder_left": [root_x + 0.18, 1.52, 3.0],
|
||||
"shoulder_right": [root_x - 0.18, 1.52, 3.0],
|
||||
"elbow_left": [root_x + 0.42, 1.48, 3.02],
|
||||
"elbow_right": [root_x - 0.42, 1.48, 3.02],
|
||||
"wrist_left": [root_x + 0.64, 1.45, 3.04],
|
||||
"wrist_right": [root_x - 0.64, 1.45, 3.04],
|
||||
"knee_left": [root_x + 0.1, 0.58, 3.0],
|
||||
"knee_right": [root_x - 0.1, 0.58, 3.0],
|
||||
"ankle_left": [root_x + 0.1, 0.15, 3.02],
|
||||
"ankle_right": [root_x - 0.1, 0.15, 3.02],
|
||||
"head": [root_x, 1.82, 3.02],
|
||||
"nose": [root_x, 1.8, 3.06],
|
||||
"eye_left": [root_x + 0.03, 1.81, 3.05],
|
||||
"eye_right": [root_x - 0.03, 1.81, 3.05],
|
||||
"ear_left": [root_x + 0.06, 1.81, 3.02],
|
||||
"ear_right": [root_x - 0.06, 1.81, 3.02],
|
||||
}
|
||||
for name, position in joint_positions.items():
|
||||
pose[BODY20_INDEX_BY_NAME[name], :3] = position
|
||||
pose[BODY20_INDEX_BY_NAME[name], 3] = score
|
||||
return ProposalCluster(
|
||||
pose3d=pose,
|
||||
root=np.asarray([root_x, 1.0, 3.0], dtype=np.float64),
|
||||
source_views=frozenset({"cam0", "cam1"}),
|
||||
support_size=2,
|
||||
mean_score=score,
|
||||
)
|
||||
|
||||
|
||||
def test_single_person_mode_caps_active_tracks(monkeypatch) -> None:
|
||||
tracker = PoseTracker(
|
||||
_make_scene(),
|
||||
TrackerConfig(
|
||||
mode="single_person",
|
||||
tentative_min_age=1,
|
||||
tentative_hits_required=1,
|
||||
tentative_promote_score=0.0,
|
||||
active_miss_to_lost=5,
|
||||
proposal_min_score=0.5,
|
||||
),
|
||||
)
|
||||
proposals_by_bundle = {
|
||||
0: (_make_proposal(0.0, score=0.95), _make_proposal(0.9, score=0.7)),
|
||||
1: (_make_proposal(0.05, score=0.96), _make_proposal(0.85, score=0.75)),
|
||||
}
|
||||
|
||||
monkeypatch.setattr(
|
||||
tracker,
|
||||
"_build_proposals",
|
||||
lambda bundle, unmatched: proposals_by_bundle[bundle.bundle_index],
|
||||
)
|
||||
|
||||
results = tracker.run([_make_bundle(0), _make_bundle(1)])
|
||||
|
||||
assert len(results[0].active_tracks) == 1
|
||||
assert len(results[1].active_tracks) == 1
|
||||
assert not results[1].tentative_tracks
|
||||
assert [track.track_id for track in results[1].active_tracks] == [1]
|
||||
|
||||
|
||||
def test_single_person_mode_reuses_lost_track_id(monkeypatch) -> None:
|
||||
tracker = PoseTracker(
|
||||
_make_scene(),
|
||||
TrackerConfig(
|
||||
mode="single_person",
|
||||
tentative_min_age=1,
|
||||
tentative_hits_required=1,
|
||||
tentative_promote_score=0.0,
|
||||
active_miss_to_lost=1,
|
||||
lost_delete_age=10,
|
||||
proposal_min_score=0.5,
|
||||
),
|
||||
)
|
||||
proposals_by_bundle = {
|
||||
0: (_make_proposal(0.0, score=0.95),),
|
||||
1: (),
|
||||
2: (_make_proposal(0.05, score=0.96),),
|
||||
}
|
||||
|
||||
monkeypatch.setattr(
|
||||
tracker,
|
||||
"_build_proposals",
|
||||
lambda bundle, unmatched: proposals_by_bundle[bundle.bundle_index],
|
||||
)
|
||||
|
||||
results = tracker.run([_make_bundle(0), _make_bundle(1), _make_bundle(2)])
|
||||
|
||||
assert [track.track_id for track in results[0].active_tracks] == [1]
|
||||
assert [track.track_id for track in results[1].lost_tracks] == [1]
|
||||
assert [track.track_id for track in results[2].active_tracks] == [1]
|
||||
assert tracker.diagnostics_snapshot().reacquisitions >= 1
|
||||
@@ -0,0 +1,67 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
pytest.importorskip("rpt")
|
||||
|
||||
from pose_tracking_exp.models import CameraFrame, FrameBundle, PoseDetection, TrackerConfig
|
||||
from pose_tracking_exp.replay import load_scene_file
|
||||
from pose_tracking_exp.tracker import PoseTracker
|
||||
|
||||
RPT_ROOT = Path("/home/crosstyan/Code/RapidPoseTriangulation")
|
||||
|
||||
|
||||
def test_tracker_promotes_rpt_sample_person():
|
||||
scene = load_scene_file(RPT_ROOT / "data/p1/sample.json")
|
||||
pose_payload = json.loads((RPT_ROOT / "tests/poses_p1.json").read_text(encoding="utf-8"))
|
||||
view_poses = pose_payload["2D"]
|
||||
|
||||
tracker = PoseTracker(
|
||||
scene,
|
||||
TrackerConfig(
|
||||
tentative_min_age=2,
|
||||
tentative_hits_required=2,
|
||||
tentative_promote_score=1.2,
|
||||
proposal_min_score=0.5,
|
||||
),
|
||||
)
|
||||
|
||||
bundles: list[FrameBundle] = []
|
||||
for bundle_index in range(3):
|
||||
views: list[CameraFrame] = []
|
||||
for camera, detections in zip(scene.cameras, view_poses, strict=True):
|
||||
pose_array = np.asarray(detections, dtype=np.float64)
|
||||
frame_detections: list[PoseDetection] = []
|
||||
for person_pose in pose_array:
|
||||
mins = person_pose[:, :2].min(axis=0)
|
||||
maxs = person_pose[:, :2].max(axis=0)
|
||||
frame_detections.append(
|
||||
PoseDetection(
|
||||
bbox=np.asarray([mins[0], mins[1], maxs[0], maxs[1]], dtype=np.float64),
|
||||
bbox_confidence=1.0,
|
||||
keypoints=person_pose,
|
||||
)
|
||||
)
|
||||
views.append(
|
||||
CameraFrame(
|
||||
camera_name=camera.name,
|
||||
frame_index=bundle_index,
|
||||
timestamp_unix_ns=1_000_000_000 + bundle_index * 33_000_000,
|
||||
detections=tuple(frame_detections),
|
||||
source_size=(camera.width, camera.height),
|
||||
)
|
||||
)
|
||||
bundles.append(
|
||||
FrameBundle(
|
||||
bundle_index=bundle_index,
|
||||
timestamp_unix_ns=views[0].timestamp_unix_ns,
|
||||
views=tuple(views),
|
||||
)
|
||||
)
|
||||
|
||||
results = tracker.run(bundles)
|
||||
assert any(result.proposals for result in results)
|
||||
assert any(result.active_tracks for result in results[1:])
|
||||
assert len(results[-1].active_tracks) >= 1
|
||||
Reference in New Issue
Block a user