Files
rapid-pose-rgbd-python-example/tests/test_rgbd_pipeline.py
T

260 lines
8.0 KiB
Python

from pathlib import Path
import numpy as np
from click.testing import CliRunner
from rapid_pose_rgbd_example import (
Camera,
CameraModel,
apply_depth_offsets,
lift_depth_poses_to_world,
make_camera,
make_reconstruction_config,
merge_rgbd_views,
pack_poses_2d,
reconstruct_rgbd,
sample_depth_for_poses,
)
from rapid_pose_rgbd_example.cli import main as cli_main
ROOT = Path(__file__).resolve().parent
FIXTURE_PATH = ROOT / "fixtures" / "single_person_two_views.npz"
JOINT_NAMES = [
"nose",
"eye_left",
"eye_right",
"ear_left",
"ear_right",
"shoulder_left",
"shoulder_right",
"elbow_left",
"elbow_right",
"wrist_left",
"wrist_right",
"hip_left",
"hip_right",
"knee_left",
"knee_right",
"ankle_left",
"ankle_right",
"hip_middle",
"shoulder_middle",
"head",
]
def make_camera_example(name: str) -> Camera:
return make_camera(
name,
[[1000, 0, 0], [0, 1000, 0], [0, 0, 1]],
[0, 0, 0, 0, 0],
[[1, 0, 0], [0, 1, 0], [0, 0, 1]],
[[0], [0], [0]],
256,
256,
CameraModel.PINHOLE,
)
def make_config(num_views: int):
return make_reconstruction_config(
[make_camera_example(f"Camera {index + 1}") for index in range(num_views)],
np.asarray([[10.0, 10.0, 10.0], [0.0, 0.0, 0.0]], dtype=np.float32),
JOINT_NAMES,
)
def make_body_2d() -> np.ndarray:
return np.asarray(
[
[150, 50, 1.0],
[145, 48, 1.0],
[155, 48, 1.0],
[138, 50, 1.0],
[162, 50, 1.0],
[135, 80, 1.0],
[165, 80, 1.0],
[125, 115, 1.0],
[175, 115, 1.0],
[115, 150, 1.0],
[185, 150, 1.0],
[145, 130, 1.0],
[155, 130, 1.0],
[145, 175, 1.0],
[155, 175, 1.0],
[145, 220, 1.0],
[155, 220, 1.0],
[150, 130, 1.0],
[150, 80, 1.0],
[150, 50, 1.0],
],
dtype=np.float32,
)
def load_frozen_fixture():
fixture = np.load(FIXTURE_PATH, allow_pickle=True)
cameras = [
make_camera(
f"Camera {index + 1}",
fixture["K"][index],
fixture["DC"][index],
fixture["R"][index],
fixture["T"][index],
int(fixture["widths"][index]),
int(fixture["heights"][index]),
str(fixture["models"][index]),
)
for index in range(int(fixture["K"].shape[0]))
]
config = make_reconstruction_config(
cameras,
np.stack((fixture["room_size"], fixture["room_center"]), axis=0),
fixture["joint_names"].tolist(),
)
return fixture, config
def test_pack_poses_2d_pads_and_counts():
poses_2d, person_counts = pack_poses_2d(
[
np.zeros((2, 3, 3), dtype=np.float32),
np.zeros((1, 3, 3), dtype=np.float32),
],
joint_count=3,
)
assert poses_2d.shape == (2, 2, 3, 3)
np.testing.assert_array_equal(person_counts, np.asarray([2, 1], dtype=np.uint32))
np.testing.assert_array_equal(poses_2d[1, 1], np.zeros((3, 3), dtype=np.float32))
def test_sample_depth_for_poses_respects_person_counts_and_scores():
poses_2d = np.zeros((1, 2, 2, 3), dtype=np.float32)
poses_2d[0, 0, 0] = [5, 6, 0.8]
poses_2d[0, 0, 1] = [7, 8, 0.0]
person_counts = np.asarray([1], dtype=np.uint32)
depth_image = np.full((16, 16), 3000, dtype=np.float32)
depth_image[0, 0] = 1234
poses_uvd = sample_depth_for_poses(poses_2d, person_counts, [depth_image])
np.testing.assert_allclose(poses_uvd[0, 0, 0], [5.0, 6.0, 3000.0, 0.8], rtol=1e-6, atol=1e-6)
np.testing.assert_array_equal(poses_uvd[0, 0, 1], np.zeros((4,), dtype=np.float32))
np.testing.assert_array_equal(poses_uvd[0, 1], np.zeros((2, 4), dtype=np.float32))
def test_apply_depth_offsets_uses_joint_mapping_without_mutating_input():
poses_uvd = np.zeros((1, 1, 3, 4), dtype=np.float32)
poses_uvd[0, 0, :, 2] = 3000.0
poses_uvd[0, 0, :, 3] = 1.0
adjusted = apply_depth_offsets(poses_uvd, ["nose", "shoulder_left", "unknown_joint"])
np.testing.assert_allclose(adjusted[0, 0, :, 2], [3005.0, 3030.0, 3000.0], rtol=1e-6, atol=1e-6)
np.testing.assert_allclose(poses_uvd[0, 0, :, 2], [3000.0, 3000.0, 3000.0], rtol=1e-6, atol=1e-6)
def test_lift_depth_poses_to_world_matches_camera_projection():
poses_uvd = np.zeros((1, 1, 2, 4), dtype=np.float32)
poses_uvd[0, 0, 0] = [100.0, 200.0, 3000.0, 0.9]
poses_uvd[0, 0, 1] = [0.0, 0.0, 0.0, 0.0]
lifted = lift_depth_poses_to_world(poses_uvd, [make_camera_example("Camera 1")])
np.testing.assert_allclose(lifted[0, 0, 0], [0.3, 0.6, 3.0, 0.9], rtol=1e-6, atol=1e-6)
np.testing.assert_array_equal(lifted[0, 0, 1], np.zeros((4,), dtype=np.float32))
def test_merge_rgbd_views_merges_identical_world_poses():
config = make_config(2)
body_2d = make_body_2d()
poses_2d = np.zeros((2, 1, len(JOINT_NAMES), 3), dtype=np.float32)
poses_2d[0, 0] = body_2d
poses_2d[1, 0] = body_2d
person_counts = np.asarray([1, 1], dtype=np.uint32)
depth_images = [np.full((256, 256), 3000, dtype=np.float32) for _ in range(2)]
poses_uvd = sample_depth_for_poses(poses_2d, person_counts, depth_images)
poses_uvd = apply_depth_offsets(poses_uvd, JOINT_NAMES)
poses_3d_by_view = lift_depth_poses_to_world(poses_uvd, config.cameras)
merged = merge_rgbd_views(poses_3d_by_view, person_counts, config)
assert merged.shape == (1, len(JOINT_NAMES), 4)
np.testing.assert_allclose(merged[0, :-1], poses_3d_by_view[0, 0, :-1], rtol=1e-5, atol=1e-5)
expected_head = (poses_3d_by_view[0, 0, 3] + poses_3d_by_view[0, 0, 4]) * 0.5
expected_head[3] = min(poses_3d_by_view[0, 0, 3, 3], poses_3d_by_view[0, 0, 4, 3])
np.testing.assert_allclose(merged[0, -1], expected_head, rtol=1e-5, atol=1e-5)
def test_reconstruct_rgbd_matches_manual_pipeline_for_single_view_person():
config = make_config(2)
body_2d = make_body_2d()
poses_2d = np.zeros((2, 1, len(JOINT_NAMES), 3), dtype=np.float32)
poses_2d[0, 0] = body_2d
person_counts = np.asarray([1, 0], dtype=np.uint32)
depth_images = [
np.full((256, 256), 3000, dtype=np.float32),
np.zeros((256, 256), dtype=np.float32),
]
manual = merge_rgbd_views(
lift_depth_poses_to_world(
apply_depth_offsets(sample_depth_for_poses(poses_2d, person_counts, depth_images), JOINT_NAMES),
config.cameras,
),
person_counts,
config,
)
reconstructed = reconstruct_rgbd(poses_2d, person_counts, depth_images, config)
assert reconstructed.shape == (1, len(JOINT_NAMES), 4)
np.testing.assert_allclose(reconstructed, manual, rtol=1e-5, atol=1e-5)
assert int(np.count_nonzero(reconstructed[0, :, 3] > 0.0)) >= 7
def test_reconstruct_rgbd_matches_frozen_reference_fixture():
fixture, config = load_frozen_fixture()
result = reconstruct_rgbd(
fixture["poses_2d"],
fixture["person_counts"],
fixture["depth_images"],
config,
)
np.testing.assert_allclose(result, fixture["expected_poses_3d"], rtol=1e-4, atol=1e-4)
def test_reconstruct_rgbd_is_repeatable():
fixture, config = load_frozen_fixture()
first = reconstruct_rgbd(
fixture["poses_2d"],
fixture["person_counts"],
fixture["depth_images"],
config,
)
second = reconstruct_rgbd(
fixture["poses_2d"],
fixture["person_counts"],
fixture["depth_images"],
config,
)
np.testing.assert_allclose(first, second, rtol=1e-6, atol=1e-6)
def test_cli_writes_result_file(tmp_path: Path):
output_path = tmp_path / "result.npy"
runner = CliRunner()
result = runner.invoke(
cli_main,
["--fixture", str(FIXTURE_PATH), "--output", str(output_path)],
)
assert result.exit_code == 0, result.output
assert output_path.exists()
saved = np.load(output_path)
assert saved.shape == (1, len(JOINT_NAMES), 4)